From 225bd79e64dfedd5bdd1204411c1ead22032e0b1 Mon Sep 17 00:00:00 2001 From: Pierre-Alexandre Meyer Date: Mon, 4 Nov 2019 06:40:38 +0000 Subject: [PATCH] listener: refresh payment rows when CBA is distributed In https://github.com/killbill/killbill-analytics-plugin/issues/93, we stopped refreshing payments for INVOICE_CREATION and INVOICE_ADJUSTMENT events. Payment rows still need to be updated (if they exist) when CBA is distributed, as the balance may change. This fixes https://github.com/killbill/killbill-analytics-plugin/issues/105. Signed-off-by: Pierre-Alexandre Meyer --- .../analytics/AnalyticsJobHierarchy.java | 6 +-- .../plugin/analytics/AnalyticsListener.java | 39 +++++++++++++++---- .../analytics/TestAnalyticsListener.java | 4 +- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsJobHierarchy.java b/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsJobHierarchy.java index 1d3b4126..21685c09 100644 --- a/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsJobHierarchy.java +++ b/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsJobHierarchy.java @@ -17,12 +17,10 @@ package org.killbill.billing.plugin.analytics; -import org.killbill.billing.notification.plugin.api.ExtBusEventType; - public abstract class AnalyticsJobHierarchy { - public static Group fromEventType(final ExtBusEventType eventType) { - switch (eventType) { + public static Group fromEventType(final AnalyticsJob job) { + switch (job.getEventType()) { // Account information is denormalized across all tables case ACCOUNT_CREATION: case ACCOUNT_CHANGE: diff --git a/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsListener.java b/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsListener.java index 6870c1e0..5320fa10 100644 --- a/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsListener.java +++ b/src/main/java/org/killbill/billing/plugin/analytics/AnalyticsListener.java @@ -19,6 +19,7 @@ package org.killbill.billing.plugin.analytics; import java.io.IOException; +import java.math.BigDecimal; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.Executor; @@ -27,6 +28,8 @@ import org.joda.time.DateTime; import org.killbill.billing.ObjectType; +import org.killbill.billing.invoice.api.Invoice; +import org.killbill.billing.invoice.api.InvoiceApiException; import org.killbill.billing.notification.plugin.api.ExtBusEvent; import org.killbill.billing.osgi.libs.killbill.OSGIConfigPropertiesService; import org.killbill.billing.osgi.libs.killbill.OSGIKillbillAPI; @@ -43,9 +46,11 @@ import org.killbill.billing.plugin.analytics.dao.BusinessSubscriptionTransitionDao; import org.killbill.billing.plugin.analytics.dao.CurrencyConversionDao; import org.killbill.billing.plugin.analytics.dao.factory.BusinessContextFactory; +import org.killbill.billing.plugin.api.PluginTenantContext; import org.killbill.billing.util.api.RecordIdApi; import org.killbill.billing.util.callcontext.CallContext; import org.killbill.billing.util.callcontext.CallOrigin; +import org.killbill.billing.util.callcontext.TenantContext; import org.killbill.billing.util.callcontext.UserType; import org.killbill.clock.Clock; import org.killbill.notificationq.DefaultNotificationQueueService; @@ -66,6 +71,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import static org.killbill.billing.notification.plugin.api.ExtBusEventType.PAYMENT_SUCCESS; import static org.killbill.billing.plugin.analytics.AnalyticsActivator.ANALYTICS_QUEUE_SERVICE; public class AnalyticsListener implements OSGIKillbillEventDispatcher.OSGIKillbillEventHandler { @@ -186,13 +192,30 @@ public void handleKillbillEvent(final ExtBusEvent killbillEvent) { return; } + AnalyticsJob job = new AnalyticsJob(killbillEvent); + if (AnalyticsJobHierarchy.fromEventType(job) == Group.INVOICES) { + try { + final TenantContext tenantContext = new PluginTenantContext(killbillEvent.getAccountId(), killbillEvent.getTenantId()); + final Invoice invoice = osgiKillbillAPI.getInvoiceUserApi().getInvoice(killbillEvent.getObjectId(), tenantContext); + if (BigDecimal.ZERO.compareTo(invoice.getCreditedAmount()) != 0 && invoice.getNumberOfPayments() > 0) { + // The invoice has payments and CBA was updated: payment rows must be updated + // See https://github.com/killbill/killbill-analytics-plugin/issues/105 + job = new AnalyticsJob(PAYMENT_SUCCESS, + killbillEvent.getObjectType(), + killbillEvent.getObjectId(), + killbillEvent.getAccountId(), + killbillEvent.getTenantId()); + } + } catch (final InvoiceApiException e) { + logger.warn("Unable to retrieve InvoiceUserApi for event {}, payment data might be stale", killbillEvent); + } + } + // Events we don't care about - if (shouldIgnoreEvent(killbillEvent)) { + if (shouldIgnoreEvent(job)) { return; } - final AnalyticsJob job = new AnalyticsJob(killbillEvent); - Long accountRecordId = null; Long tenantRecordId = null; final RecordIdApi recordIdApi = osgiKillbillAPI.getRecordIdApi(); @@ -273,10 +296,10 @@ public boolean apply(final NotificationEventWithMetadata notificat // Does the specified existing notification overlap with this job? private boolean jobsOverlap(final AnalyticsJob job, final NotificationEventWithMetadata notificationEventForExistingJob) { final AnalyticsJob existingJob = notificationEventForExistingJob.getEvent(); - final AnalyticsJobHierarchy.Group existingHierarchyGroup = AnalyticsJobHierarchy.fromEventType(existingJob.getEventType()); + final AnalyticsJobHierarchy.Group existingHierarchyGroup = AnalyticsJobHierarchy.fromEventType(existingJob); return existingJob.getAccountId().equals(job.getAccountId()) && - (existingHierarchyGroup.equals(AnalyticsJobHierarchy.fromEventType(job.getEventType())) || + (existingHierarchyGroup.equals(AnalyticsJobHierarchy.fromEventType(job)) || AnalyticsJobHierarchy.Group.ALL.equals(existingHierarchyGroup)); } @@ -288,7 +311,7 @@ private void handleAnalyticsJob(final AnalyticsJob job) throws AnalyticsRefreshE final CallContext callContext = new AnalyticsCallContext(job, clock); final BusinessContextFactory businessContextFactory = new BusinessContextFactory(job.getAccountId(), callContext, currencyConversionDao, osgiKillbillAPI, osgiConfigPropertiesService, clock, analyticsConfigurationHandler); - final Group group = AnalyticsJobHierarchy.fromEventType(job.getEventType()); + final Group group = AnalyticsJobHierarchy.fromEventType(job); logger.info("Starting {} Analytics refresh for account {}", group, businessContextFactory.getAccountId()); switch (group) { case ALL: @@ -326,8 +349,8 @@ protected boolean isAccountBlacklisted(@Nullable final UUID accountId) { } @VisibleForTesting - protected boolean shouldIgnoreEvent(final ExtBusEvent event) { - return Iterables.find(ignoredGroups, Predicates.equalTo(AnalyticsJobHierarchy.fromEventType(event.getEventType())), null) != null; + protected boolean shouldIgnoreEvent(final AnalyticsJob job) { + return Iterables.find(ignoredGroups, Predicates.equalTo(AnalyticsJobHierarchy.fromEventType(job)), null) != null; } @VisibleForTesting diff --git a/src/test/java/org/killbill/billing/plugin/analytics/TestAnalyticsListener.java b/src/test/java/org/killbill/billing/plugin/analytics/TestAnalyticsListener.java index 6ab04cda..d7b05c44 100644 --- a/src/test/java/org/killbill/billing/plugin/analytics/TestAnalyticsListener.java +++ b/src/test/java/org/killbill/billing/plugin/analytics/TestAnalyticsListener.java @@ -64,7 +64,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { final AnalyticsListener analyticsListener = new AnalyticsListener(killbillAPI, killbillDataSource, osgiConfigPropertiesService, null, clock, analyticsConfigurationHandler, notificationQueueService); - Assert.assertTrue(analyticsListener.shouldIgnoreEvent(cfEvent)); - Assert.assertFalse(analyticsListener.shouldIgnoreEvent(accountEvent)); + Assert.assertTrue(analyticsListener.shouldIgnoreEvent(new AnalyticsJob(cfEvent))); + Assert.assertFalse(analyticsListener.shouldIgnoreEvent(new AnalyticsJob(accountEvent))); } }