Skip to content

Commit

Permalink
Merge pull request #1451 from killbill/junction-optimization
Browse files Browse the repository at this point in the history
junction: rewrite BlockingCalculator to be more efficient
  • Loading branch information
sbrossie committed Jul 8, 2021
2 parents 0f07d53 + 4dc67fb commit 58a89e7
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 132 deletions.
@@ -1,7 +1,8 @@
/*
* Copyright 2010-2013 Ning, Inc.
* Copyright 2014-2016 Groupon, Inc
* Copyright 2014-2016 The Billing Project, LLC
* Copyright 2010-2014 Ning, Inc.
* Copyright 2014-2020 Groupon, Inc
* Copyright 2020-2021 Equinix, Inc
* Copyright 2014-2021 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand All @@ -27,7 +28,6 @@
import org.killbill.billing.account.api.AccountApiException;
import org.killbill.billing.api.TestApiListener.NextEvent;
import org.killbill.billing.catalog.api.BillingPeriod;
import org.killbill.billing.catalog.api.PlanPhasePriceOverride;
import org.killbill.billing.catalog.api.PlanPhaseSpecifier;
import org.killbill.billing.catalog.api.PriceListSet;
import org.killbill.billing.entitlement.EntitlementTestSuiteWithEmbeddedDB;
Expand Down Expand Up @@ -301,8 +301,11 @@ public void testCancelWithEntitlementPolicyEOTNoCTDAndImmediateChange() throws A
assertListenerStatus();
final Entitlement entitlement = entitlementApi.getEntitlementForId(entitlementId, callContext);

clock.addDeltaFromReality(1000); // Make sure CHANGE does not collide with CREATE
assertListenerStatus();

// Immediate change during trial
testListener.pushExpectedEvent(NextEvent.CREATE);
testListener.pushExpectedEvent(NextEvent.CHANGE);
final PlanPhaseSpecifier planPhaseSpecifier = new PlanPhaseSpecifier("Assault-Rifle", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
entitlement.changePlan(new DefaultEntitlementSpecifier(planPhaseSpecifier), ImmutableList.<PluginProperty>of(), callContext);
assertListenerStatus();
Expand Down
@@ -1,6 +1,8 @@
/*
* Copyright 2014-2017 Groupon, Inc
* Copyright 2014-2017 The Billing Project, LLC
* Copyright 2010-2014 Ning, Inc.
* Copyright 2014-2020 Groupon, Inc
* Copyright 2020-2021 Equinix, Inc
* Copyright 2014-2021 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
Expand All @@ -18,9 +20,10 @@
package org.killbill.billing.junction.plumbing.billing;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -49,13 +52,10 @@
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;

Expand All @@ -80,25 +80,44 @@ static AtomicLong getGlobalTotalOrder() {
*
* @param billingEvents the original list of billing events to update (without overdue events)
*/
public boolean insertBlockingEvents(final SortedSet<BillingEvent> billingEvents, final Set<UUID> skippedSubscriptions, final Map<UUID, List<SubscriptionBase>> subscriptionsForAccount, final VersionedCatalog catalog, final InternalTenantContext context) throws CatalogApiException {
public boolean insertBlockingEvents(final SortedSet<BillingEvent> billingEvents,
final Set<UUID> skippedSubscriptions,
final Map<UUID, List<SubscriptionBase>> subscriptionsForAccount,
final VersionedCatalog catalog,
final InternalTenantContext context) throws CatalogApiException {
if (billingEvents.size() <= 0) {
return false;
}

final SortedSet<BillingEvent> billingEventsToAdd = new TreeSet<BillingEvent>();
final SortedSet<BillingEvent> billingEventsToRemove = new TreeSet<BillingEvent>();
final Collection<BillingEvent> billingEventsToAdd = new TreeSet<BillingEvent>();
final Collection<BillingEvent> billingEventsToRemove = new TreeSet<BillingEvent>();

final List<BlockingState> blockingEvents = blockingApi.getBlockingAllForAccount(catalog, context);

final Iterable<BlockingState> accountBlockingEvents = Iterables.filter(blockingEvents, new Predicate<BlockingState>() {
@Override
public boolean apply(final BlockingState input) {
return BlockingStateType.ACCOUNT == input.getType();
// Group blocking states per type
final Collection<BlockingState> accountBlockingEvents = new LinkedList<BlockingState>();
final Map<UUID, List<BlockingState>> perBundleBlockingEvents = new HashMap<UUID, List<BlockingState>>();
final Map<UUID, List<BlockingState>> perSubscriptionBlockingEvents = new HashMap<UUID, List<BlockingState>>();
for (final BlockingState blockingEvent : blockingEvents) {
if (blockingEvent.getType() == BlockingStateType.ACCOUNT) {
accountBlockingEvents.add(blockingEvent);
} else if (blockingEvent.getType() == BlockingStateType.SUBSCRIPTION_BUNDLE) {
perBundleBlockingEvents.putIfAbsent(blockingEvent.getBlockedId(), new LinkedList<BlockingState>());
perBundleBlockingEvents.get(blockingEvent.getBlockedId()).add(blockingEvent);
} else if (blockingEvent.getType() == BlockingStateType.SUBSCRIPTION) {
perSubscriptionBlockingEvents.putIfAbsent(blockingEvent.getBlockedId(), new LinkedList<BlockingState>());
perSubscriptionBlockingEvents.get(blockingEvent.getBlockedId()).add(blockingEvent);
}
});
}

final Map<UUID, List<BlockingState>> perBundleBlockingEvents = getPerTypeBlockingEvents(BlockingStateType.SUBSCRIPTION_BUNDLE, blockingEvents);
final Map<UUID, List<BlockingState>> perSubscriptionBlockingEvents = getPerTypeBlockingEvents(BlockingStateType.SUBSCRIPTION, blockingEvents);
// Group billing events per subscriptionId
final Map<UUID, SortedSet<BillingEvent>> perSubscriptionBillingEvents = new HashMap<UUID, SortedSet<BillingEvent>>();
for (final BillingEvent event : billingEvents) {
if (!perSubscriptionBillingEvents.containsKey(event.getSubscriptionId())) {
perSubscriptionBillingEvents.put(event.getSubscriptionId(), new TreeSet<BillingEvent>());
}
perSubscriptionBillingEvents.get(event.getSubscriptionId()).add(event);
}

for (final Entry<UUID, List<SubscriptionBase>> entry : subscriptionsForAccount.entrySet()) {
final UUID bundleId = entry.getKey();
Expand All @@ -115,19 +134,17 @@ public boolean apply(final BlockingState input) {
final List<BlockingState> aggregateSubscriptionBlockingEvents = getAggregateBlockingEventsPerSubscription(subscription.getEndDate(), subscriptionBlockingEvents, bundleBlockingEvents, accountBlockingEvents);
final List<DisabledDuration> accountBlockingDurations = createBlockingDurations(aggregateSubscriptionBlockingEvents);

final SortedSet<BillingEvent> subscriptionBillingEvents = filter(billingEvents, subscription);
final SortedSet<BillingEvent> subscriptionBillingEvents = perSubscriptionBillingEvents.getOrDefault(subscription.getId(), ImmutableSortedSet.<BillingEvent>of());

final SortedSet<BillingEvent> newEvents = createNewEvents(accountBlockingDurations, subscriptionBillingEvents, catalog, context);
final SortedSet<BillingEvent> newEvents = createNewEvents(accountBlockingDurations, subscriptionBillingEvents, context);
billingEventsToAdd.addAll(newEvents);

final SortedSet<BillingEvent> removedEvents = eventsToRemove(accountBlockingDurations, subscriptionBillingEvents);
billingEventsToRemove.addAll(removedEvents);
}
}

for (final BillingEvent eventToAdd : billingEventsToAdd) {
billingEvents.add(eventToAdd);
}
billingEvents.addAll(billingEventsToAdd);

for (final BillingEvent eventToRemove : billingEventsToRemove) {
billingEvents.remove(eventToRemove);
Expand All @@ -136,40 +153,23 @@ public boolean apply(final BlockingState input) {
return !(billingEventsToAdd.isEmpty() && billingEventsToRemove.isEmpty());
}

final List<BlockingState> getAggregateBlockingEventsPerSubscription(@Nullable final DateTime subscriptionEndDate, final Iterable<BlockingState> subscriptionBlockingEvents, final Iterable<BlockingState> bundleBlockingEvents, final Iterable<BlockingState> accountBlockingEvents) {
final Iterable<BlockingState> tmp = Iterables.concat(subscriptionBlockingEvents, bundleBlockingEvents, accountBlockingEvents);
final Iterable<BlockingState> allEventsPriorToCancelDate = Iterables.filter(tmp,
new Predicate<BlockingState>() {
@Override
public boolean apply(final BlockingState input) {
return subscriptionEndDate == null || input.getEffectiveDate().compareTo(subscriptionEndDate) <= 0;
}
});
final List<BlockingState> result = Lists.newArrayList(allEventsPriorToCancelDate);
Collections.sort(result);
return result;
}

final Map<UUID, List<BlockingState>> getPerTypeBlockingEvents(final BlockingStateType type, final List<BlockingState> blockingEvents) {
final Iterable<BlockingState> bundleBlockingEvents = Iterables.filter(blockingEvents, new Predicate<BlockingState>() {
@Override
public boolean apply(final BlockingState input) {
return type == input.getType();
}
});

final Map<UUID, List<BlockingState>> perTypeBlockingEvents = new HashMap<UUID, List<BlockingState>>();
for (final BlockingState cur : bundleBlockingEvents) {
if (!perTypeBlockingEvents.containsKey(cur.getBlockedId())) {
perTypeBlockingEvents.put(cur.getBlockedId(), new ArrayList<BlockingState>());
final List<BlockingState> getAggregateBlockingEventsPerSubscription(@Nullable final DateTime subscriptionEndDate,
final Iterable<BlockingState> subscriptionBlockingEvents,
final Iterable<BlockingState> bundleBlockingEvents,
final Iterable<BlockingState> accountBlockingEvents) {
final List<BlockingState> result = new LinkedList<BlockingState>();
for (final BlockingState bs : Iterables.concat(subscriptionBlockingEvents, bundleBlockingEvents, accountBlockingEvents)) {
if (subscriptionEndDate == null || bs.getEffectiveDate().compareTo(subscriptionEndDate) <= 0) {
// Event is prior to cancel date
result.add(bs);
}
perTypeBlockingEvents.get(cur.getBlockedId()).add(cur);
}
return perTypeBlockingEvents;
Collections.sort(result);
return result;
}

protected SortedSet<BillingEvent> eventsToRemove(final List<DisabledDuration> disabledDuration,
final SortedSet<BillingEvent> subscriptionBillingEvents) {
protected SortedSet<BillingEvent> eventsToRemove(final Iterable<DisabledDuration> disabledDuration,
final Iterable<BillingEvent> subscriptionBillingEvents) {
final SortedSet<BillingEvent> result = new TreeSet<BillingEvent>();

for (final DisabledDuration duration : disabledDuration) {
Expand All @@ -186,8 +186,9 @@ protected SortedSet<BillingEvent> eventsToRemove(final List<DisabledDuration> di
return result;
}

protected SortedSet<BillingEvent> createNewEvents(final List<DisabledDuration> disabledDuration, final SortedSet<BillingEvent> subscriptionBillingEvents, final VersionedCatalog catalog, final InternalTenantContext context) throws CatalogApiException {

protected SortedSet<BillingEvent> createNewEvents(final Iterable<DisabledDuration> disabledDuration,
final Iterable<BillingEvent> subscriptionBillingEvents,
final InternalTenantContext context) throws CatalogApiException {
Preconditions.checkState(context.getAccountRecordId() != null);

final SortedSet<BillingEvent> result = new TreeSet<BillingEvent>();
Expand All @@ -201,17 +202,18 @@ protected SortedSet<BillingEvent> createNewEvents(final List<DisabledDuration> d
if (precedingInitialEvent != null) { // there is a preceding billing event
result.add(createNewDisableEvent(duration.getStart(), precedingInitialEvent));
if (duration.getEnd() != null) { // no second event in the pair means they are still disabled (no re-enable)
result.add(createNewReenableEvent(duration.getEnd(), precedingFinalEvent, context));
result.add(createNewReenableEvent(duration.getEnd(), precedingFinalEvent));
}
} else if (precedingFinalEvent != null) { // can happen - e.g. phase event
result.add(createNewReenableEvent(duration.getEnd(), precedingFinalEvent, context));
result.add(createNewReenableEvent(duration.getEnd(), precedingFinalEvent));
}
// N.B. if there's no precedingInitial and no precedingFinal then there's nothing to do
}
return result;
}

protected BillingEvent precedingBillingEventForSubscription(final DateTime disabledDurationStart, final SortedSet<BillingEvent> subscriptionBillingEvents) {
protected BillingEvent precedingBillingEventForSubscription(final DateTime disabledDurationStart,
final Iterable<BillingEvent> subscriptionBillingEvents) {
if (disabledDurationStart == null) {
return null;
}
Expand All @@ -228,18 +230,8 @@ protected BillingEvent precedingBillingEventForSubscription(final DateTime disab
return prev;
}

protected SortedSet<BillingEvent> filter(final SortedSet<BillingEvent> billingEvents, final SubscriptionBase subscription) {
final SortedSet<BillingEvent> result = new TreeSet<BillingEvent>();
for (final BillingEvent event : billingEvents) {
if (event.getSubscriptionId().equals(subscription.getId())) {
result.add(event);
}
}
return result;
}

protected BillingEvent createNewDisableEvent(final DateTime disabledDurationStart, final BillingEvent previousEvent) throws CatalogApiException {

protected BillingEvent createNewDisableEvent(final DateTime disabledDurationStart,
final BillingEvent previousEvent) {
final int billCycleDay = previousEvent.getBillCycleDayLocal();
final DateTime effectiveDate = disabledDurationStart;
final PlanPhase planPhase = previousEvent.getPlanPhase();
Expand Down Expand Up @@ -274,7 +266,8 @@ protected BillingEvent createNewDisableEvent(final DateTime disabledDurationStar
);
}

protected BillingEvent createNewReenableEvent(final DateTime odEventTime, final BillingEvent previousEvent, final InternalTenantContext context) throws CatalogApiException {
protected BillingEvent createNewReenableEvent(final DateTime odEventTime,
final BillingEvent previousEvent) throws CatalogApiException {
// All fields are populated with the event state from before the blocking period, for invoice to resume invoicing
final int billCycleDay = previousEvent.getBillCycleDayLocal();
final DateTime effectiveDate = odEventTime;
Expand Down Expand Up @@ -309,36 +302,23 @@ protected BillingEvent createNewReenableEvent(final DateTime odEventTime, final

// In ascending order
protected List<DisabledDuration> createBlockingDurations(final Iterable<BlockingState> inputBundleEvents) {

final List<DisabledDuration> result = new ArrayList<DisabledDuration>();

final Set<String> services = ImmutableSet.copyOf(Iterables.transform(inputBundleEvents, new Function<BlockingState, String>() {
@Override
public String apply(final BlockingState input) {
return input.getService();
}
}));
final List<DisabledDuration> result = new LinkedList<DisabledDuration>();

final Map<String, BlockingStateService> svcBlockedMap = new HashMap<String, BlockingStateService>();
for (String svc : services) {
svcBlockedMap.put(svc, new BlockingStateService());
for (final BlockingState bs : inputBundleEvents) {
final String service = bs.getService();
svcBlockedMap.putIfAbsent(service, new BlockingStateService());
svcBlockedMap.get(service).addBlockingState(bs);
}

for (final BlockingState e : inputBundleEvents) {
svcBlockedMap.get(e.getService()).addBlockingState(e);
final Collection<DisabledDuration> unorderedDisabledDuration = new LinkedList<DisabledDuration>();
for (final Entry<String, BlockingStateService> entry : svcBlockedMap.entrySet()) {
unorderedDisabledDuration.addAll(entry.getValue().build());
}

final Iterable<DisabledDuration> unorderedDisabledDuration = Iterables.concat(Iterables.transform(svcBlockedMap.values(), new Function<BlockingStateService, List<DisabledDuration>>() {
@Override
public List<DisabledDuration> apply(final BlockingStateService input) {
return input.build();
}
}));

final List<DisabledDuration> sortedDisabledDuration = Ordering.natural().sortedCopy(unorderedDisabledDuration);

DisabledDuration prevDuration = null;
for (DisabledDuration d : sortedDisabledDuration) {
for (final DisabledDuration d : sortedDisabledDuration) {
// isDisjoint
if (prevDuration == null) {
prevDuration = d;
Expand Down

0 comments on commit 58a89e7

Please sign in to comment.