Skip to content

Commit

Permalink
Merge pull request #106 from killbill/fix-for-105
Browse files Browse the repository at this point in the history
listener: refresh payment rows when CBA is distributed
  • Loading branch information
sbrossie committed Nov 5, 2019
2 parents 27c7d0d + 225bd79 commit 10b3d59
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
Expand Up @@ -17,12 +17,10 @@

package org.killbill.billing.plugin.analytics;

import org.killbill.billing.notification.plugin.api.ExtBusEventType;

public abstract class AnalyticsJobHierarchy {

public static Group fromEventType(final ExtBusEventType eventType) {
switch (eventType) {
public static Group fromEventType(final AnalyticsJob job) {
switch (job.getEventType()) {
// Account information is denormalized across all tables
case ACCOUNT_CREATION:
case ACCOUNT_CHANGE:
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.killbill.billing.plugin.analytics;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Executor;
Expand All @@ -27,6 +28,8 @@

import org.joda.time.DateTime;
import org.killbill.billing.ObjectType;
import org.killbill.billing.invoice.api.Invoice;
import org.killbill.billing.invoice.api.InvoiceApiException;
import org.killbill.billing.notification.plugin.api.ExtBusEvent;
import org.killbill.billing.osgi.libs.killbill.OSGIConfigPropertiesService;
import org.killbill.billing.osgi.libs.killbill.OSGIKillbillAPI;
Expand All @@ -43,9 +46,11 @@
import org.killbill.billing.plugin.analytics.dao.BusinessSubscriptionTransitionDao;
import org.killbill.billing.plugin.analytics.dao.CurrencyConversionDao;
import org.killbill.billing.plugin.analytics.dao.factory.BusinessContextFactory;
import org.killbill.billing.plugin.api.PluginTenantContext;
import org.killbill.billing.util.api.RecordIdApi;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.clock.Clock;
import org.killbill.notificationq.DefaultNotificationQueueService;
Expand All @@ -66,6 +71,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;

import static org.killbill.billing.notification.plugin.api.ExtBusEventType.PAYMENT_SUCCESS;
import static org.killbill.billing.plugin.analytics.AnalyticsActivator.ANALYTICS_QUEUE_SERVICE;

public class AnalyticsListener implements OSGIKillbillEventDispatcher.OSGIKillbillEventHandler {
Expand Down Expand Up @@ -186,13 +192,30 @@ public void handleKillbillEvent(final ExtBusEvent killbillEvent) {
return;
}

AnalyticsJob job = new AnalyticsJob(killbillEvent);
if (AnalyticsJobHierarchy.fromEventType(job) == Group.INVOICES) {
try {
final TenantContext tenantContext = new PluginTenantContext(killbillEvent.getAccountId(), killbillEvent.getTenantId());
final Invoice invoice = osgiKillbillAPI.getInvoiceUserApi().getInvoice(killbillEvent.getObjectId(), tenantContext);
if (BigDecimal.ZERO.compareTo(invoice.getCreditedAmount()) != 0 && invoice.getNumberOfPayments() > 0) {
// The invoice has payments and CBA was updated: payment rows must be updated
// See https://github.com/killbill/killbill-analytics-plugin/issues/105
job = new AnalyticsJob(PAYMENT_SUCCESS,
killbillEvent.getObjectType(),
killbillEvent.getObjectId(),
killbillEvent.getAccountId(),
killbillEvent.getTenantId());
}
} catch (final InvoiceApiException e) {
logger.warn("Unable to retrieve InvoiceUserApi for event {}, payment data might be stale", killbillEvent);
}
}

// Events we don't care about
if (shouldIgnoreEvent(killbillEvent)) {
if (shouldIgnoreEvent(job)) {
return;
}

final AnalyticsJob job = new AnalyticsJob(killbillEvent);

Long accountRecordId = null;
Long tenantRecordId = null;
final RecordIdApi recordIdApi = osgiKillbillAPI.getRecordIdApi();
Expand Down Expand Up @@ -273,10 +296,10 @@ public boolean apply(final NotificationEventWithMetadata<AnalyticsJob> notificat
// Does the specified existing notification overlap with this job?
private boolean jobsOverlap(final AnalyticsJob job, final NotificationEventWithMetadata<AnalyticsJob> notificationEventForExistingJob) {
final AnalyticsJob existingJob = notificationEventForExistingJob.getEvent();
final AnalyticsJobHierarchy.Group existingHierarchyGroup = AnalyticsJobHierarchy.fromEventType(existingJob.getEventType());
final AnalyticsJobHierarchy.Group existingHierarchyGroup = AnalyticsJobHierarchy.fromEventType(existingJob);

return existingJob.getAccountId().equals(job.getAccountId()) &&
(existingHierarchyGroup.equals(AnalyticsJobHierarchy.fromEventType(job.getEventType())) ||
(existingHierarchyGroup.equals(AnalyticsJobHierarchy.fromEventType(job)) ||
AnalyticsJobHierarchy.Group.ALL.equals(existingHierarchyGroup));
}

Expand All @@ -288,7 +311,7 @@ private void handleAnalyticsJob(final AnalyticsJob job) throws AnalyticsRefreshE
final CallContext callContext = new AnalyticsCallContext(job, clock);
final BusinessContextFactory businessContextFactory = new BusinessContextFactory(job.getAccountId(), callContext, currencyConversionDao, osgiKillbillAPI, osgiConfigPropertiesService, clock, analyticsConfigurationHandler);

final Group group = AnalyticsJobHierarchy.fromEventType(job.getEventType());
final Group group = AnalyticsJobHierarchy.fromEventType(job);
logger.info("Starting {} Analytics refresh for account {}", group, businessContextFactory.getAccountId());
switch (group) {
case ALL:
Expand Down Expand Up @@ -326,8 +349,8 @@ protected boolean isAccountBlacklisted(@Nullable final UUID accountId) {
}

@VisibleForTesting
protected boolean shouldIgnoreEvent(final ExtBusEvent event) {
return Iterables.find(ignoredGroups, Predicates.<Group>equalTo(AnalyticsJobHierarchy.fromEventType(event.getEventType())), null) != null;
protected boolean shouldIgnoreEvent(final AnalyticsJob job) {
return Iterables.find(ignoredGroups, Predicates.<Group>equalTo(AnalyticsJobHierarchy.fromEventType(job)), null) != null;
}

@VisibleForTesting
Expand Down
Expand Up @@ -64,7 +64,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI, killbillDataSource, osgiConfigPropertiesService, null, clock, analyticsConfigurationHandler, notificationQueueService);

Assert.assertTrue(analyticsListener.shouldIgnoreEvent(cfEvent));
Assert.assertFalse(analyticsListener.shouldIgnoreEvent(accountEvent));
Assert.assertTrue(analyticsListener.shouldIgnoreEvent(new AnalyticsJob(cfEvent)));
Assert.assertFalse(analyticsListener.shouldIgnoreEvent(new AnalyticsJob(accountEvent)));
}
}

0 comments on commit 10b3d59

Please sign in to comment.