Skip to content

Commit

Permalink
subscription: code review for #897
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre-Alexandre Meyer <pierre@mouraf.org>
  • Loading branch information
pierre committed Mar 14, 2018
1 parent 3931c16 commit 1374486
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 81 deletions.
Expand Up @@ -671,7 +671,28 @@ public void rebuildTransitions(final List<SubscriptionBaseEvent> inputEvents, fi

this.events = inputEvents;

filterOutDuplicateCancelEvents(events);
Collections.sort(inputEvents, new Comparator<SubscriptionBaseEvent>() {
@Override
public int compare(final SubscriptionBaseEvent o1, final SubscriptionBaseEvent o2) {
final int res = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
if (res != 0) {
return res;
}

// In-memory events have a total order of 0, make sure they are after on disk event
if (o1.getTotalOrdering() == 0 && o2.getTotalOrdering() > 0) {
return 1;
} else if (o1.getTotalOrdering() > 0 && o2.getTotalOrdering() == 0) {
return -1;
} else if (o1.getTotalOrdering() == o2.getTotalOrdering()) {
return 0;
} else {
return o1.getTotalOrdering() < (o2.getTotalOrdering()) ? -1 : 1;
}
}
});

removeEverythingPastCancelEvent(events);

UUID nextUserToken = null;

Expand All @@ -692,30 +713,11 @@ public void rebuildTransitions(final List<SubscriptionBaseEvent> inputEvents, fi

transitions = new LinkedList<SubscriptionBaseTransition>();

// DefaultSubscriptionDao#buildBundleSubscriptions may have added an out-of-order cancellation event (https://github.com/killbill/killbill/issues/897)
final SubscriptionBaseEvent cancellationEvent = Iterables.tryFind(inputEvents,
new Predicate<SubscriptionBaseEvent>() {
@Override
public boolean apply(final SubscriptionBaseEvent input) {
return input.getType() == EventType.API_USER && ((ApiEvent) input).getApiEventType() == ApiEventType.CANCEL;
}
}).orNull();
for (final SubscriptionBaseEvent cur : inputEvents) {
if (!cur.isActive()) {
continue;
}

if (cancellationEvent != null) {
if (cur.getId().compareTo(cancellationEvent.getId()) == 0) {
// Keep the cancellation event
} else if (cur.getType() == EventType.API_USER && (((ApiEvent) cur).getApiEventType() == ApiEventType.TRANSFER || ((ApiEvent) cur).getApiEventType() == ApiEventType.CREATE)) {
// Keep the initial event (SOT use-case)
} else if (cur.getEffectiveDate().compareTo(cancellationEvent.getEffectiveDate()) >= 0) {
// Event to ignore past cancellation date
continue;
}
}

ApiEventType apiEventType = null;
boolean isFromDisk = true;

Expand Down Expand Up @@ -811,73 +813,41 @@ public boolean apply(final SubscriptionBaseEvent input) {
}
}

// Skip any event after a CANCEL event:
//
// Hardening against data integrity issues where we have multiple active CANCEL (See #619):
// We skip any cancel events after the first one (subscription cannot be cancelled multiple times).
// The code should prevent such cases from happening but because of #619, some invalid data could be there so to be safe we added this code
//
// Also we remove !onDisk cancel events when there is an onDisk cancel event (can happen during the path where we process the base plan cancel notification, and are
// in the process of adding the new cancel events for the AO)
// * DefaultSubscriptionDao#buildBundleSubscriptions may have added an out-of-order cancellation event (https://github.com/killbill/killbill/issues/897)
// * Hardening against data integrity issues where we have multiple active CANCEL (https://github.com/killbill/killbill/issues/619)
//
private void filterOutDuplicateCancelEvents(final List<SubscriptionBaseEvent> inputEvents) {

Collections.sort(inputEvents, new Comparator<SubscriptionBaseEvent>() {
@Override
public int compare(final SubscriptionBaseEvent o1, final SubscriptionBaseEvent o2) {
int res = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
if (res == 0) {
// In-memory events have a total order of 0, make sure they are after on disk event
if (o1.getTotalOrdering() == 0 && o2.getTotalOrdering() > 0) {
return 1;
} else if (o1.getTotalOrdering() > 0 && o2.getTotalOrdering() == 0) {
return -1;
} else {
res = o1.getTotalOrdering() < (o2.getTotalOrdering()) ? -1 : 1;
}
}
return res;
}
});

final boolean isCancelled = Iterables.any(inputEvents, new Predicate<SubscriptionBaseEvent>() {
@Override
public boolean apply(final SubscriptionBaseEvent input) {
if (input.isActive() && input.getType() == EventType.API_USER) {
final ApiEvent userEV = (ApiEvent) input;
if (userEV.getApiEventType() == ApiEventType.CANCEL && userEV.isFromDisk()) {
return true;
}
}
return false;
}
});

if (!isCancelled) {
private void removeEverythingPastCancelEvent(final List<SubscriptionBaseEvent> inputEvents) {
final SubscriptionBaseEvent cancellationEvent = Iterables.tryFind(inputEvents,
new Predicate<SubscriptionBaseEvent>() {
@Override
public boolean apply(final SubscriptionBaseEvent input) {
return input.getType() == EventType.API_USER && ((ApiEvent) input).getApiEventType() == ApiEventType.CANCEL;
}
}).orNull();
if (cancellationEvent == null) {
return;
}


boolean foundFirstOnDiskCancel = false;
final Iterator<SubscriptionBaseEvent> it = inputEvents.iterator();
while(it.hasNext()) {
final Iterator<SubscriptionBaseEvent> it = inputEvents.iterator();
while (it.hasNext()) {
final SubscriptionBaseEvent input = it.next();
if (!input.isActive()) {
continue;
}

if (input.getType() == EventType.API_USER) {
final ApiEvent userEV = (ApiEvent) input;
if (userEV.getApiEventType() == ApiEventType.CANCEL) {
if (userEV.isFromDisk()) {
if (!foundFirstOnDiskCancel) {
foundFirstOnDiskCancel = true;
} else {
it.remove();
}
} else {
it.remove();
}
}
if (input.getId().compareTo(cancellationEvent.getId()) == 0) {
// Keep the cancellation event
} else if (input.getType() == EventType.API_USER && (((ApiEvent) input).getApiEventType() == ApiEventType.TRANSFER || ((ApiEvent) input).getApiEventType() == ApiEventType.CREATE)) {
// Keep the initial event (SOT use-case)
} else if (input.getType() == EventType.API_USER && (((ApiEvent) input).getApiEventType() == ApiEventType.CANCEL) && !((ApiEvent) input).isFromDisk()) {
// Also we remove !onDisk cancel events when there is an onDisk cancel event (can happen during the path where we process the base plan cancel notification, and are
// in the process of adding the new cancel events for the AO)
it.remove();
} else if (input.getEffectiveDate().compareTo(cancellationEvent.getEffectiveDate()) >= 0) {
// Event to ignore past cancellation date
it.remove();
}
}
}
Expand Down
Expand Up @@ -809,13 +809,14 @@ private void cancelSubscriptionFromTransaction(final DefaultSubscriptionBase sub
final UUID subscriptionId = subscription.getId();
cancelFutureEventsFromTransaction(subscriptionId, cancelEvent.getEffectiveDate(), entitySqlDaoWrapperFactory, true, context);
final SubscriptionEventSqlDao subscriptionEventSqlDao = entitySqlDaoWrapperFactory.become(SubscriptionEventSqlDao.class);
createAndRefresh(subscriptionEventSqlDao, new SubscriptionEventModelDao(cancelEvent), context);
final SubscriptionEventModelDao cancelEventWithUpdatedTotalOrdering = createAndRefresh(subscriptionEventSqlDao, new SubscriptionEventModelDao(cancelEvent), context);

final boolean isBusEvent = cancelEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
recordBusOrFutureNotificationFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, catalog, context);
final SubscriptionBaseEvent refreshedSubscriptionEvent = SubscriptionEventModelDao.toSubscriptionEvent(cancelEventWithUpdatedTotalOrdering);
final boolean isBusEvent = refreshedSubscriptionEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
recordBusOrFutureNotificationFromTransaction(subscription, refreshedSubscriptionEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, catalog, context);

// Notify the Bus of the requested change
notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, cancelEvent, SubscriptionBaseTransitionType.CANCEL, context);
notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, refreshedSubscriptionEvent, SubscriptionBaseTransitionType.CANCEL, context);
}

private void cancelNextPhaseEventFromTransaction(final UUID subscriptionId, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalCallContext context) {
Expand Down

0 comments on commit 1374486

Please sign in to comment.