Skip to content

Commit

Permalink
subscription: send effective events from transactions
Browse files Browse the repository at this point in the history
Relates to #451.

Signed-off-by: Pierre-Alexandre Meyer <pierre@mouraf.org>
  • Loading branch information
pierre committed Jan 13, 2016
1 parent 9b32f45 commit da2f94d
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 56 deletions.
Expand Up @@ -81,7 +81,7 @@ public DateTime changePlanWithPolicy(DefaultSubscriptionBase subscription, Strin
String priceList, List<PlanPhasePriceOverride> overrides, BillingActionPolicy policy, CallContext context)
throws SubscriptionBaseApiException;

public int cancelAddOnsIfRequired(final Product baseProduct, final UUID bundleId, final DateTime effectiveDate, final CallContext context) throws CatalogApiException;
public int cancelAddOnsIfRequiredOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException;

public PlanChangeResult getPlanChangeResult(final DefaultSubscriptionBase subscription, final String productName,
final BillingPeriod term, final String priceList, final DateTime effectiveDate, TenantContext context) throws SubscriptionBaseApiException;
Expand Down
Expand Up @@ -519,13 +519,14 @@ public List<SubscriptionBaseEvent> getEventsOnCancelPlan(final DefaultSubscripti
}

@Override
public int cancelAddOnsIfRequired(final Product baseProduct, final UUID bundleId, final DateTime effectiveDate, final CallContext context) throws CatalogApiException {
public int cancelAddOnsIfRequiredOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException {
final Product baseProduct = (subscription.getState() == EntitlementState.CANCELLED) ? null : subscription.getCurrentPlan().getProduct();

final List<SubscriptionBaseEvent> cancelEvents = new LinkedList<SubscriptionBaseEvent>();
final InternalCallContext internalCallContext = createCallContextFromBundleId(bundleId, context);
final List<DefaultSubscriptionBase> subscriptionsToBeCancelled = computeAddOnsToCancel(cancelEvents, baseProduct, bundleId, effectiveDate, internalCallContext);
if (!subscriptionsToBeCancelled.isEmpty()) {
dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, internalCallContext);
}
final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
final List<DefaultSubscriptionBase> subscriptionsToBeCancelled = computeAddOnsToCancel(cancelEvents, baseProduct, subscription.getBundleId(), event.getEffectiveDate(), internalCallContext);
dao.cancelSubscriptionsOnBasePlanEvent(subscription, event, subscriptionsToBeCancelled, cancelEvents, internalCallContext);

return subscriptionsToBeCancelled.size();
}

Expand Down
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
* Copyright 2014-2015 Groupon, Inc
* Copyright 2014-2015 The Billing Project, LLC
* Copyright 2014-2016 Groupon, Inc
* Copyright 2014-2016 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand All @@ -23,10 +23,7 @@
import org.joda.time.DateTime;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.catalog.api.CatalogApiException;
import org.killbill.billing.catalog.api.Product;
import org.killbill.billing.catalog.api.ProductCategory;
import org.killbill.billing.entitlement.api.Entitlement.EntitlementState;
import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.platform.api.LifecycleHandlerType;
import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.subscription.alignment.PlanAligner;
Expand All @@ -41,12 +38,12 @@
import org.killbill.billing.subscription.events.SubscriptionBaseEvent.EventType;
import org.killbill.billing.subscription.events.phase.PhaseEvent;
import org.killbill.billing.subscription.events.phase.PhaseEventData;
import org.killbill.billing.subscription.events.user.ApiEvent;
import org.killbill.billing.subscription.exceptions.SubscriptionBaseError;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.clock.Clock;
Expand Down Expand Up @@ -74,9 +71,10 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
private final PersistentBus eventBus;
private final NotificationQueueService notificationQueueService;
private final InternalCallContextFactory internalCallContextFactory;
private NotificationQueue subscriptionEventQueue;
private final SubscriptionBaseApiService apiService;

private NotificationQueue subscriptionEventQueue;

@Inject
public DefaultSubscriptionBaseService(final Clock clock, final SubscriptionDao dao, final PlanAligner planAligner,
final PersistentBus eventBus,
Expand Down Expand Up @@ -159,30 +157,32 @@ public void processEventReady(final SubscriptionBaseEvent event, final int seqId
return;
}

//
// Do any internal processing on that event before we send the event to the bus
//
int theRealSeqId = seqId;
boolean eventSent = false;
if (event.getType() == EventType.PHASE) {
onPhaseEvent(subscription, context);
eventSent = onPhaseEvent(subscription, event, context);
} else if (event.getType() == EventType.API_USER && subscription.getCategory() == ProductCategory.BASE) {
final CallContext callContext = internalCallContextFactory.createCallContext(context);
theRealSeqId = onBasePlanEvent(subscription, (ApiEvent) event, callContext);
eventSent = onBasePlanEvent(subscription, event, callContext);
}

final SubscriptionBaseTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
context.getUserToken(),
context.getAccountRecordId(), context.getTenantRecordId());
eventBus.post(busEvent);
if (!eventSent) {
// Methods above invoking the DAO will send this event directly from the transaction

This comment has been minimized.

Copy link
@sbrossie

sbrossie Jan 25, 2016

Member

If the method onPhaseEvent and onBasePlanEvent, why do we then send the event at this level?

final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(event, seqId);
final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
subscription.getAlignStartDate(),
context.getUserToken(),
context.getAccountRecordId(),
context.getTenantRecordId());
eventBus.post(busEvent);
}
} catch (final EventBusException e) {
log.warn("Failed to post subscription event " + event, e);
} catch (final CatalogApiException e) {
log.warn("Failed to post subscription event " + event, e);
}
}

private void onPhaseEvent(final DefaultSubscriptionBase subscription, final InternalCallContext context) {
private boolean onPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final InternalCallContext context) {
try {
final DateTime now = clock.getUTCNow();
final TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, context);
Expand All @@ -191,15 +191,18 @@ private void onPhaseEvent(final DefaultSubscriptionBase subscription, final Inte
nextTimedPhase.getPhase().getName(), nextTimedPhase.getStartPhase()) :
null;
if (nextPhaseEvent != null) {
dao.createNextPhaseEvent(subscription, nextPhaseEvent, context);
dao.createNextPhaseEvent(subscription, readyPhaseEvent, nextPhaseEvent, context);
return true;
}
} catch (final SubscriptionBaseError e) {
log.error(String.format("Failed to insert next phase for subscription %s", subscription.getId()), e);
}

return false;
}

private int onBasePlanEvent(final DefaultSubscriptionBase baseSubscription, final ApiEvent event, final CallContext context) throws CatalogApiException {
final Product baseProduct = (baseSubscription.getState() == EntitlementState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
return apiService.cancelAddOnsIfRequired(baseProduct, baseSubscription.getBundleId(), event.getEffectiveDate(), context);
private boolean onBasePlanEvent(final DefaultSubscriptionBase baseSubscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException {
apiService.cancelAddOnsIfRequiredOnBasePlanEvent(baseSubscription, event, context);
return true;
}
}
Expand Up @@ -45,7 +45,6 @@
import org.killbill.billing.catalog.api.ProductCategory;
import org.killbill.billing.entitlement.api.SubscriptionApiException;
import org.killbill.billing.entity.EntityPersistenceException;
import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.subscription.api.SubscriptionBase;
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
import org.killbill.billing.subscription.api.migration.AccountMigrationData;
Expand Down Expand Up @@ -86,6 +85,7 @@
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.clock.Clock;
Expand Down Expand Up @@ -405,20 +405,21 @@ public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFa
}

@Override
public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final SubscriptionBaseEvent nextPhaseEvent, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
final SubscriptionEventSqlDao transactional = entitySqlDaoWrapperFactory.become(SubscriptionEventSqlDao.class);
final UUID subscriptionId = subscription.getId();
cancelNextPhaseEventFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
transactional.create(new SubscriptionEventModelDao(nextPhase), context);
transactional.create(new SubscriptionEventModelDao(nextPhaseEvent), context);
recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
nextPhase.getEffectiveDate(),
new SubscriptionNotificationKey(nextPhase.getId()), context);
nextPhaseEvent.getEffectiveDate(),
new SubscriptionNotificationKey(nextPhaseEvent.getId()), context);

// Notify the Bus of the requested change
notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhase, SubscriptionBaseTransitionType.PHASE, context);
// Notify the Bus
notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhaseEvent, SubscriptionBaseTransitionType.PHASE, context);
notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, readyPhaseEvent, 0, context);

return null;
}
Expand Down Expand Up @@ -560,6 +561,19 @@ public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFa
});
}

@Override
public void cancelSubscriptionsOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
cancelSubscriptionsFromTransaction(entitySqlDaoWrapperFactory, subscriptions, cancelEvents, context);
// Make sure to always send the event, even if there were no subscriptions to cancel
notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, subscriptions.size(), context);
return null;
}
});
}

@Override
public void cancelSubscriptions(final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
Expand Down Expand Up @@ -1046,12 +1060,12 @@ private SubscriptionBase getBaseSubscription(final UUID bundleId, final boolean
}

//
// Either records a notfication or sends a bus event is operation is immediate
// Either records a notification or sends a bus event if operation is immediate
//
private void recordBusOrFutureNotificationFromTransaction(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final boolean busEvent,
final int seqId, final InternalCallContext context) {
if (busEvent) {
notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
rebuildSubscriptionAndNotifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
} else {
recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
event.getEffectiveDate(),
Expand All @@ -1060,25 +1074,29 @@ private void recordBusOrFutureNotificationFromTransaction(final DefaultSubscript
}
}

//
// Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
// - CREATE,
// - IMM CANCEL or CHANGE
//
private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
// Sends bus notification for event on effective date -- only used for operation that happen immediately
private void rebuildSubscriptionAndNotifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {

This comment has been minimized.

Copy link
@sbrossie

sbrossie Jan 25, 2016

Member

It seems counter-intuitive to have to send events at the time we rebuild, since this should just a be a read-only operation?

try {
final DefaultSubscriptionBase upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent, context);
notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, upToDateSubscription, immediateEvent, seqId, context);
} catch (final CatalogApiException e) {
log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
}
}

final SubscriptionBaseTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
context.getUserToken(),
context.getAccountRecordId(), context.getTenantRecordId());
private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
try {
final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(immediateEvent, seqId);
final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
subscription.getAlignStartDate(),
context.getUserToken(),
context.getAccountRecordId(),
context.getTenantRecordId());

eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getHandle().getConnection());
} catch (EventBusException e) {
log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
} catch (CatalogApiException e) {
} catch (final EventBusException e) {
log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
}
}
Expand All @@ -1087,7 +1105,7 @@ private void notifyBusOfRequestedChange(final EntitySqlDaoWrapperFactory entityS
final SubscriptionBaseEvent nextEvent, final SubscriptionBaseTransitionType transitionType, final InternalCallContext context) {
try {
eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent, transitionType, context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken()), entitySqlDaoWrapperFactory.getHandle().getConnection());
} catch (EventBusException e) {
} catch (final EventBusException e) {
log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
}
}
Expand Down

0 comments on commit da2f94d

Please sign in to comment.