Skip to content

Commit

Permalink
Merge pull request #107 from killbill/lock-account
Browse files Browse the repository at this point in the history
listener: add account-level locking
  • Loading branch information
sbrossie committed Nov 8, 2019
2 parents 2c70150 + 4a7092b commit 0c1b9a5
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pom.xml
Expand Up @@ -197,6 +197,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kill-bill.commons</groupId>
<artifactId>killbill-locker</artifactId>
</dependency>
<dependency>
<groupId>org.kill-bill.commons</groupId>
<artifactId>killbill-clock</artifactId>
Expand Down
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2014 Ning, Inc.
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
* Copyright 2014-2019 Groupon, Inc
* Copyright 2014-2019 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand All @@ -18,8 +18,6 @@

package org.killbill.billing.plugin.analytics;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Hashtable;
import java.util.concurrent.Executor;

Expand All @@ -35,19 +33,24 @@
import org.killbill.billing.plugin.analytics.api.user.AnalyticsUserApi;
import org.killbill.billing.plugin.analytics.core.AnalyticsHealthcheck;
import org.killbill.billing.plugin.analytics.dao.BusinessDBIProvider;
import org.killbill.billing.plugin.analytics.http.AnalyticsHealthcheckResource;
import org.killbill.billing.plugin.analytics.http.AnalyticsAccountResource;
import org.killbill.billing.plugin.analytics.http.AnalyticsHealthcheckResource;
import org.killbill.billing.plugin.analytics.http.ReportsResource;
import org.killbill.billing.plugin.analytics.reports.ReportsConfiguration;
import org.killbill.billing.plugin.analytics.reports.ReportsUserApi;
import org.killbill.billing.plugin.analytics.reports.scheduler.JobsScheduler;
import org.killbill.billing.plugin.api.notification.PluginConfigurationEventHandler;
import org.killbill.billing.plugin.core.resources.jooby.PluginApp;
import org.killbill.billing.plugin.core.resources.jooby.PluginAppBuilder;
import org.killbill.billing.plugin.dao.PluginDao;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.clock.Clock;
import org.killbill.commons.embeddeddb.EmbeddedDB;
import org.killbill.commons.jdbi.mapper.LowerToCamelBeanMapperFactory;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.locker.memory.MemoryGlobalLocker;
import org.killbill.commons.locker.mysql.MySqlGlobalLocker;
import org.killbill.commons.locker.postgresql.PostgreSQLGlobalLocker;
import org.killbill.notificationq.DefaultNotificationQueueService;
import org.killbill.notificationq.api.NotificationQueueConfig;
import org.killbill.notificationq.dao.NotificationEventModelDao;
Expand Down Expand Up @@ -99,13 +102,34 @@ public void start(final BundleContext context) throws Exception {
final AnalyticsConfiguration globalConfiguration = analyticsConfigurationHandler.createConfigurable(configProperties.getProperties());
analyticsConfigurationHandler.setDefaultConfigurable(globalConfiguration);

analyticsListener = new AnalyticsListener(roOSGIkillbillAPI, dataSource, configProperties, executor, killbillClock, analyticsConfigurationHandler, notificationQueueService);
final EmbeddedDB.DBEngine dbEngine = PluginDao.getDBEngine(dataSource.getDataSource());
final GlobalLocker locker;
switch (dbEngine) {
case MYSQL:
locker = new MySqlGlobalLocker(dataSource.getDataSource());
break;
case POSTGRESQL:
locker = new PostgreSQLGlobalLocker(dataSource.getDataSource());
break;
case GENERIC:
case H2:
default:
locker = new MemoryGlobalLocker();
break;
}
analyticsListener = new AnalyticsListener(roOSGIkillbillAPI,
dataSource,
configProperties,
executor,
locker,
killbillClock,
analyticsConfigurationHandler,
notificationQueueService);

jobsScheduler = new JobsScheduler(dataSource, killbillClock, notificationQueueService);

final ReportsConfiguration reportsConfiguration = new ReportsConfiguration(dataSource, jobsScheduler);

final EmbeddedDB.DBEngine dbEngine = getDbEngine();
final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(roOSGIkillbillAPI, dataSource, configProperties, executor, killbillClock, analyticsConfigurationHandler);
reportsUserApi = new ReportsUserApi(roOSGIkillbillAPI, dataSource, configProperties, dbEngine, reportsConfiguration, jobsScheduler);

Expand Down Expand Up @@ -170,29 +194,4 @@ private void registerHealthcheck(final BundleContext context, final AnalyticsHea
props.put(OSGIPluginProperties.PLUGIN_NAME_PROP, PLUGIN_NAME);
registrar.registerService(context, Healthcheck.class, healthcheck, props);
}

private EmbeddedDB.DBEngine getDbEngine() throws SQLException {
Connection connection = null;
String databaseProductName = null;
try {
connection = dataSource.getDataSource().getConnection();
databaseProductName = connection.getMetaData().getDatabaseProductName();
} finally {
if (connection != null) {
connection.close();
}
}

final EmbeddedDB.DBEngine dbEngine;
if ("H2".equalsIgnoreCase(databaseProductName)) {
dbEngine = EmbeddedDB.DBEngine.H2;
} else if ("MySQL".equalsIgnoreCase(databaseProductName)) {
dbEngine = EmbeddedDB.DBEngine.MYSQL;
} else if ("PostgreSQL".equalsIgnoreCase(databaseProductName)) {
dbEngine = EmbeddedDB.DBEngine.POSTGRESQL;
} else {
dbEngine = EmbeddedDB.DBEngine.GENERIC;
}
return dbEngine;
}
}
Expand Up @@ -53,6 +53,9 @@
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLock;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.locker.LockFailedException;
import org.killbill.notificationq.DefaultNotificationQueueService;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationEventWithMetadata;
Expand Down Expand Up @@ -104,18 +107,21 @@ public class AnalyticsListener implements OSGIKillbillEventDispatcher.OSGIKillbi
private final AllBusinessObjectsDao allBusinessObjectsDao;
private final CurrencyConversionDao currencyConversionDao;
private final NotificationQueue jobQueue;
private final GlobalLocker locker;
private final Clock clock;
private final AnalyticsConfigurationHandler analyticsConfigurationHandler;

public AnalyticsListener(final OSGIKillbillAPI osgiKillbillAPI,
final OSGIKillbillDataSource osgiKillbillDataSource,
final OSGIConfigPropertiesService osgiConfigPropertiesService,
final Executor executor,
final GlobalLocker locker,
final Clock clock,
final AnalyticsConfigurationHandler analyticsConfigurationHandler,
final DefaultNotificationQueueService notificationQueueService) throws NotificationQueueAlreadyExists {
this.osgiKillbillAPI = osgiKillbillAPI;
this.osgiConfigPropertiesService = osgiConfigPropertiesService;
this.locker = locker;
this.clock = clock;
this.analyticsConfigurationHandler = analyticsConfigurationHandler;

Expand Down Expand Up @@ -304,6 +310,20 @@ private boolean jobsOverlap(final AnalyticsJob job, final NotificationEventWithM
}

private void handleAnalyticsJob(final AnalyticsJob job) throws AnalyticsRefreshException {
GlobalLock lock = null;
try {
lock = locker.lockWithNumberOfTries("ANALYTICS_REFRESH", job.getAccountId().toString(), 100);
handleAnalyticsJobWithLock(job);
} catch (final LockFailedException e) {
throw new RuntimeException(e);
} finally {
if (lock != null) {
lock.release();
}
}
}

private void handleAnalyticsJobWithLock(final AnalyticsJob job) throws AnalyticsRefreshException {
if (job.getEventType() == null) {
return;
}
Expand Down
@@ -1,6 +1,7 @@
/*
* Copyright 2010-2014 Ning, Inc.
* Copyright 2014 The Billing Project, LLC
* Copyright 2014-2019 Groupon, Inc
* Copyright 2014-2019 The Billing Project, LLC
*
* Ning licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand Down Expand Up @@ -96,6 +97,8 @@
import org.killbill.billing.util.tag.Tag;
import org.killbill.billing.util.tag.TagDefinition;
import org.killbill.clock.ClockMock;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.locker.memory.MemoryGlobalLocker;
import org.killbill.notificationq.DefaultNotificationQueueService;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -112,6 +115,8 @@ public abstract class AnalyticsTestSuiteNoDB {
private static final DateTime INVOICE_CREATED_DATE = new DateTime(2016, 1, 22, 10, 56, 53, DateTimeZone.UTC);
protected final Logger logger = LoggerFactory.getLogger(AnalyticsTestSuiteNoDB.class);

protected final GlobalLocker locker = new MemoryGlobalLocker();

protected final Long accountRecordId = 1L;
protected final Long subscriptionEventRecordId = 2L;
protected final Long invoiceRecordId = 3L;
Expand Down
@@ -1,6 +1,7 @@
/*
* Copyright 2010-2014 Ning, Inc.
* Copyright 2014-2016 The Billing Project, LLC
* Copyright 2014-2019 Groupon, Inc
* Copyright 2014-2019 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand Down
Expand Up @@ -34,7 +34,14 @@ public class TestAnalyticsListener extends AnalyticsTestSuiteNoDB {

@Test(groups = "fast")
public void testBlacklist() throws Exception {
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI, killbillDataSource, osgiConfigPropertiesService, null, clock, analyticsConfigurationHandler, notificationQueueService);
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI,
killbillDataSource,
osgiConfigPropertiesService,
null,
locker,
clock,
analyticsConfigurationHandler,
notificationQueueService);

// Other accounts are blacklisted
Assert.assertFalse(analyticsListener.isAccountBlacklisted(UUID.randomUUID()));
Expand Down Expand Up @@ -62,7 +69,14 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
});

final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI, killbillDataSource, osgiConfigPropertiesService, null, clock, analyticsConfigurationHandler, notificationQueueService);
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI,
killbillDataSource,
osgiConfigPropertiesService,
null,
locker,
clock,
analyticsConfigurationHandler,
notificationQueueService);

Assert.assertTrue(analyticsListener.shouldIgnoreEvent(new AnalyticsJob(cfEvent)));
Assert.assertFalse(analyticsListener.shouldIgnoreEvent(new AnalyticsJob(accountEvent)));
Expand Down
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2014 Ning, Inc.
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
* Copyright 2014-2019 Groupon, Inc
* Copyright 2014-2019 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand All @@ -22,22 +22,28 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.awaitility.Awaitility;
import org.killbill.billing.notification.plugin.api.ExtBusEvent;
import org.killbill.billing.notification.plugin.api.ExtBusEventType;
import org.killbill.notificationq.api.NotificationEventWithMetadata;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import org.awaitility.Awaitility;

import com.google.common.collect.Iterables;

public class TestAnalyticsNotificationQueue extends AnalyticsTestSuiteWithEmbeddedDB {

@Test(groups = "slow")
public void testSendOneEvent() throws Exception {
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI, killbillDataSource, osgiConfigPropertiesService, BusinessExecutor.newCachedThreadPool(osgiConfigPropertiesService), clock, analyticsConfigurationHandler, notificationQueueService);
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI,
killbillDataSource,
osgiConfigPropertiesService,
BusinessExecutor.newCachedThreadPool(osgiConfigPropertiesService),
locker,
clock,
analyticsConfigurationHandler,
notificationQueueService);
analyticsListener.start();

// Verify the original state
Expand All @@ -62,7 +68,14 @@ public Boolean call() throws Exception {

@Test(groups = "slow")
public void testVerifyNoDups() throws Exception {
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI, killbillDataSource, osgiConfigPropertiesService, BusinessExecutor.newCachedThreadPool(osgiConfigPropertiesService), clock, analyticsConfigurationHandler, notificationQueueService);
final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI,
killbillDataSource,
osgiConfigPropertiesService,
BusinessExecutor.newCachedThreadPool(osgiConfigPropertiesService),
locker,
clock,
analyticsConfigurationHandler,
notificationQueueService);
// Don't start the dequeuer
Assert.assertEquals(Iterables.<NotificationEventWithMetadata>size(analyticsListener.getJobQueue().getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId)), 0);

Expand Down

0 comments on commit 0c1b9a5

Please sign in to comment.