Skip to content

Commit

Permalink
Fixes #355
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrossie committed Aug 21, 2015
1 parent 39a8d61 commit f20b73b
Show file tree
Hide file tree
Showing 20 changed files with 185 additions and 137 deletions.
@@ -0,0 +1,99 @@
/*
* Copyright 2014-2015 Groupon, Inc
* Copyright 2014-2015 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.killbill.billing.payment.core;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;

import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.commons.concurrent.Executors;
import org.killbill.commons.concurrent.WithProfilingThreadPoolExecutor;

public class PaymentExecutors {

private static final long TIMEOUT_EXECUTOR_SEC = 3L;

private static final String PLUGIN_THREAD_PREFIX = "Plugin-th-";
private static final String PAYMENT_PLUGIN_TH_GROUP_NAME = "pay-plugin-grp";

public static final String JANITOR_EXECUTOR_NAMED = "JanitorExecutor";
public static final String PLUGIN_EXECUTOR_NAMED = "PluginExecutor";

private final PaymentConfig paymentConfig;

private volatile ExecutorService pluginExecutorService;
private volatile ScheduledExecutorService janitorExecutorService;

@Inject
public PaymentExecutors(PaymentConfig paymentConfig) {
this.paymentConfig = paymentConfig;

}

public void initialize() {
this.pluginExecutorService = createPluginExecutorService();
this.janitorExecutorService = createJanitorExecutorService();
}


public void stop() throws InterruptedException {
pluginExecutorService.shutdownNow();
janitorExecutorService.shutdownNow();

pluginExecutorService.awaitTermination(TIMEOUT_EXECUTOR_SEC, TimeUnit.SECONDS);
pluginExecutorService = null;

janitorExecutorService.awaitTermination(TIMEOUT_EXECUTOR_SEC, TimeUnit.SECONDS);
janitorExecutorService = null;
}

public ExecutorService getPluginExecutorService() {
return pluginExecutorService;
}

public ScheduledExecutorService getJanitorExecutorService() {
return janitorExecutorService;
}

private ExecutorService createPluginExecutorService() {
return new WithProfilingThreadPoolExecutor(paymentConfig.getPaymentPluginThreadNb(),
paymentConfig.getPaymentPluginThreadNb(),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {

@Override
public Thread newThread(final Runnable r) {
final Thread th = new Thread(new ThreadGroup(PAYMENT_PLUGIN_TH_GROUP_NAME), r);
th.setName(PLUGIN_THREAD_PREFIX + th.getId());
return th;
}
});

}

private ScheduledExecutorService createJanitorExecutorService() {
return Executors.newSingleThreadScheduledExecutor("PaymentJanitor");
}
}
Expand Up @@ -19,7 +19,6 @@

import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
Expand Down Expand Up @@ -47,13 +46,8 @@
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;
import com.google.inject.name.Named;

import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;

// We don't take any lock here because the call needs to be re-entrant
// from the plugin: for example, the BitPay plugin will create the payment during the
Expand All @@ -65,8 +59,6 @@ public class PaymentGatewayProcessor extends ProcessorBase {
private final PluginDispatcher<HostedPaymentPageFormDescriptor> paymentPluginFormDispatcher;
private final PluginDispatcher<GatewayNotification> paymentPluginNotificationDispatcher;

private static final Logger log = LoggerFactory.getLogger(PaymentGatewayProcessor.class);

@Inject
public PaymentGatewayProcessor(final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
final AccountInternalApi accountUserApi,
Expand All @@ -75,13 +67,13 @@ public PaymentGatewayProcessor(final OSGIServiceRegistration<PaymentPluginApi> p
final PaymentDao paymentDao,
final GlobalLocker locker,
final PaymentConfig paymentConfig,
@Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
final PaymentExecutors executors,
final InternalCallContextFactory internalCallContextFactory,
final Clock clock) {
super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
final long paymentPluginTimeoutSec = TimeUnit.SECONDS.convert(paymentConfig.getPaymentPluginTimeout().getPeriod(), paymentConfig.getPaymentPluginTimeout().getUnit());
this.paymentPluginFormDispatcher = new PluginDispatcher<HostedPaymentPageFormDescriptor>(paymentPluginTimeoutSec, executor);
this.paymentPluginNotificationDispatcher = new PluginDispatcher<GatewayNotification>(paymentPluginTimeoutSec, executor);
this.paymentPluginFormDispatcher = new PluginDispatcher<HostedPaymentPageFormDescriptor>(paymentPluginTimeoutSec, executors);
this.paymentPluginNotificationDispatcher = new PluginDispatcher<GatewayNotification>(paymentPluginTimeoutSec, executors);
}

public GatewayNotification processNotification(final String notification, final String pluginName, final Iterable<PluginProperty> properties, final CallContext callContext) throws PaymentApiException {
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.killbill.billing.payment.provider.DefaultPaymentMethodInfoPlugin;
import org.killbill.billing.payment.provider.ExternalPaymentProviderPlugin;
import org.killbill.billing.tag.TagInternalApi;
import org.killbill.billing.util.UUIDs;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
Expand All @@ -69,10 +69,7 @@
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.name.Named;

import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
import org.killbill.billing.util.UUIDs;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPagination;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationFromPlugins;

Expand All @@ -90,12 +87,12 @@ public PaymentMethodProcessor(final OSGIServiceRegistration<PaymentPluginApi> pl
final TagInternalApi tagUserApi,
final GlobalLocker locker,
final PaymentConfig paymentConfig,
@Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
final PaymentExecutors executors,
final InternalCallContextFactory internalCallContextFactory,
final Clock clock) {
super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
final long paymentPluginTimeoutSec = TimeUnit.SECONDS.convert(paymentConfig.getPaymentPluginTimeout().getPeriod(), paymentConfig.getPaymentPluginTimeout().getUnit());
this.uuidPluginNotificationDispatcher = new PluginDispatcher<UUID>(paymentPluginTimeoutSec, executor);
this.uuidPluginNotificationDispatcher = new PluginDispatcher<UUID>(paymentPluginTimeoutSec, executors);
}

public UUID addPaymentMethod(final String paymentMethodExternalKey, final String paymentPluginServiceName, final Account account,
Expand Down
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -75,9 +74,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.name.Named;

import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPagination;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationFromPlugins;

Expand All @@ -98,11 +95,10 @@ public PaymentProcessor(final OSGIServiceRegistration<PaymentPluginApi> pluginRe
final PaymentDao paymentDao,
final InternalCallContextFactory internalCallContextFactory,
final GlobalLocker locker,
@Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
final PaymentAutomatonRunner paymentAutomatonRunner,
final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
final Clock clock) {
super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
this.paymentAutomatonRunner = paymentAutomatonRunner;
this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
}
Expand Down
Expand Up @@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -55,9 +54,6 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.inject.name.Named;

import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;

public class PluginControlPaymentProcessor extends ProcessorBase {

Expand All @@ -73,12 +69,11 @@ public PluginControlPaymentProcessor(final OSGIServiceRegistration<PaymentPlugin
final TagInternalApi tagUserApi,
final PaymentDao paymentDao,
final GlobalLocker locker,
@Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
final InternalCallContextFactory internalCallContextFactory,
final PluginControlPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
final PaymentControlStateMachineHelper paymentControlStateMachineHelper,
final Clock clock) {
super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
this.paymentControlStateMachineHelper = paymentControlStateMachineHelper;
this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
}
Expand Down
Expand Up @@ -37,11 +37,8 @@
import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.dao.PaymentMethodModelDao;
import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
import org.killbill.billing.payment.dispatcher.CallableWithRequestData;
import org.killbill.billing.payment.dispatcher.PluginDispatcher;
import org.killbill.billing.payment.dispatcher.PluginDispatcher.PluginDispatcherReturnType;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
Expand All @@ -57,15 +54,12 @@
import org.killbill.commons.locker.GlobalLock;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.locker.LockFailedException;
import org.killbill.commons.request.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;

public abstract class ProcessorBase {

Expand All @@ -74,7 +68,6 @@ public abstract class ProcessorBase {
protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
protected final AccountInternalApi accountInternalApi;
protected final GlobalLocker locker;
protected final ExecutorService executor;
protected final PaymentDao paymentDao;
protected final InternalCallContextFactory internalCallContextFactory;
protected final TagInternalApi tagInternalApi;
Expand All @@ -88,15 +81,13 @@ public ProcessorBase(final OSGIServiceRegistration<PaymentPluginApi> pluginRegis
final PaymentDao paymentDao,
final TagInternalApi tagInternalApi,
final GlobalLocker locker,
final ExecutorService executor,
final InternalCallContextFactory internalCallContextFactory,
final InvoiceInternalApi invoiceApi,
final Clock clock) {
this.pluginRegistry = pluginRegistry;
this.accountInternalApi = accountInternalApi;
this.paymentDao = paymentDao;
this.locker = locker;
this.executor = executor;
this.tagInternalApi = tagInternalApi;
this.internalCallContextFactory = internalCallContextFactory;
this.invoiceApi = invoiceApi;
Expand Down Expand Up @@ -168,6 +159,7 @@ protected CallContext buildCallContext(final InternalCallContext context) {

// TODO Rename - there is no lock!
public interface WithAccountLockCallback<PluginDispatcherReturnType, ExceptionType extends Exception> {

public PluginDispatcherReturnType doOperation() throws ExceptionType;
}

Expand Down
Expand Up @@ -22,12 +22,11 @@
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.inject.Named;

import org.joda.time.DateTime;
import org.killbill.billing.events.PaymentInternalEvent;
import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.glue.DefaultPaymentService;
import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
Expand All @@ -49,27 +48,28 @@ public class Janitor {
public static final String QUEUE_NAME = "janitor";

private final NotificationQueueService notificationQueueService;
private final ScheduledExecutorService janitorExecutor;
private final PaymentConfig paymentConfig;
private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
private final PaymentExecutors paymentExecutors;

private NotificationQueue janitorQueue;
private ScheduledExecutorService janitorExecutor;

private volatile boolean isStopped;

@Inject
public Janitor(final PaymentConfig paymentConfig,
final NotificationQueueService notificationQueueService,
@Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
final PaymentExecutors paymentExecutors,
final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
this.notificationQueueService = notificationQueueService;
this.janitorExecutor = janitorExecutor;
this.paymentExecutors = paymentExecutors;
this.paymentConfig = paymentConfig;
this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
this.isStopped = false;

}

public void initialize() throws NotificationQueueAlreadyExists {
Expand All @@ -95,10 +95,10 @@ public void handleReadyNotification(final NotificationEvent notificationKey, fin
}

public void start() {
if (isStopped) {
log.warn("Janitor is not a restartable service, and was already started, aborting");
return;
}

this.isStopped = false;

janitorExecutor = paymentExecutors.getJanitorExecutorService();

janitorQueue.startQueue();

Expand Down

0 comments on commit f20b73b

Please sign in to comment.