Skip to content

Commit

Permalink
See #343
Browse files Browse the repository at this point in the history
Add Pagination API for Janitor opeation to make sure they go through all the records at each iteration
  • Loading branch information
sbrossie committed Jul 8, 2015
1 parent 951a786 commit be2b6b8
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 52 deletions.
Expand Up @@ -83,7 +83,7 @@ public void run() {
log.info("Janitor was requested to stop");
return;
}
final List<T> items = getItemsForIteration();
final Iterable<T> items = getItemsForIteration();
for (final T item : items) {
if (isStopped) {
log.info("Janitor was requested to stop");
Expand All @@ -101,7 +101,7 @@ public synchronized void stop() {
this.isStopped = true;
}

public abstract List<T> getItemsForIteration();
public abstract Iterable<T> getItemsForIteration();

public abstract void doIteration(final T item);

Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;

Expand Down Expand Up @@ -70,10 +71,10 @@ public IncompletePaymentAttemptTask(final InternalCallContextFactory internalCal
}

@Override
public List<PaymentAttemptModelDao> getItemsForIteration() {
final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore());
if (!incompleteAttempts.isEmpty()) {
log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.size());
public Iterable<PaymentAttemptModelDao> getItemsForIteration() {
final Pagination<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), 0L, Long.MAX_VALUE);
if (incompleteAttempts.getTotalNbRecords() > 0) {
log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.getTotalNbRecords());
}
return incompleteAttempts;
}
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;

Expand All @@ -63,8 +64,6 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
.add(TransactionStatus.PENDING)
.add(TransactionStatus.UNKNOWN)
.build();
private static final int MAX_ITEMS_PER_LOOP = 100;

@Inject
public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
final PaymentDao paymentDao, final Clock clock,
Expand All @@ -74,10 +73,10 @@ public IncompletePaymentTransactionTask(final InternalCallContextFactory interna
}

@Override
public List<PaymentTransactionModelDao> getItemsForIteration() {
final List<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), MAX_ITEMS_PER_LOOP);
if (!result.isEmpty()) {
log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.size());
public Iterable<PaymentTransactionModelDao> getItemsForIteration() {
final Pagination<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), 0L, Long.MAX_VALUE);
if (result.getTotalNbRecords() > 0) {
log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.getTotalNbRecords());
}
return result;
}
Expand Down
Expand Up @@ -20,11 +20,9 @@

import java.math.BigDecimal;
import java.util.Collection;
import java.util.HashMap;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.annotation.Nullable;
Expand All @@ -41,10 +39,12 @@
import org.killbill.billing.payment.api.DefaultPaymentPluginErrorEvent;
import org.killbill.billing.payment.api.Payment;
import org.killbill.billing.payment.api.PaymentMethod;
import org.killbill.billing.payment.api.PaymentTransaction;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.dao.NonEntityDao;
import org.killbill.billing.util.entity.Entity;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper;
import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
Expand All @@ -58,7 +58,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
Expand Down Expand Up @@ -122,14 +121,24 @@ public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFa
}

@Override
public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentAttemptModelDao>>() {
@Override
public List<PaymentAttemptModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
final PaymentAttemptSqlDao transactional = entitySqlDaoWrapperFactory.become(PaymentAttemptSqlDao.class);
return transactional.getByStateNameAcrossTenants(stateName, createdBeforeDate.toDate());
}
});
public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {

final Date createdBefore = createdBeforeDate.toDate();
return paginationHelper.getPagination(PaymentAttemptSqlDao.class, new PaginationIteratorBuilder<PaymentAttemptModelDao, Entity, PaymentAttemptSqlDao>() {
@Override
public Long getCount(final PaymentAttemptSqlDao sqlDao, final InternalTenantContext context) {
return sqlDao.getCountByStateNameAcrossTenants(stateName, createdBefore);
}
@Override
public Iterator<PaymentAttemptModelDao> build(final PaymentAttemptSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
return sqlDao.getByStateNameAcrossTenants(stateName, createdBefore, offset, limit);
}
},
offset,
limit,
null
);

}

@Override
Expand Down Expand Up @@ -157,18 +166,29 @@ public List<PaymentAttemptModelDao> inTransaction(final EntitySqlDaoWrapperFacto
}

@Override
public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
@Override
public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final Long offset, final Long limit) {

final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, Functions.toStringFunction()));
return transactional.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBeforeDate.toDate(), createdAfterDate.toDate(), limit);
}
});
}
final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, Functions.toStringFunction()));
final Date createdBefore = createdBeforeDate.toDate();
final Date createdAfter = createdAfterDate.toDate();

return paginationHelper.getPagination(TransactionSqlDao.class,
new PaginationIteratorBuilder<PaymentTransactionModelDao, PaymentTransaction, TransactionSqlDao>() {
@Override
public Long getCount(final TransactionSqlDao sqlDao, final InternalTenantContext context) {
return sqlDao.getCountByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter);
}

@Override
public Iterator<PaymentTransactionModelDao> build(final TransactionSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
return sqlDao.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter, offset, limit);
}
},
offset,
limit,
null
);
}

@Override
public List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.killbill.billing.payment.dao;

import java.util.Date;
import java.util.Iterator;
import java.util.List;

import org.killbill.billing.callcontext.InternalCallContext;
Expand Down Expand Up @@ -50,7 +51,13 @@ List<PaymentAttemptModelDao> getByPaymentExternalKey(@Bind("paymentExternalKey")
@BindBean final InternalTenantContext context);

@SqlQuery
List<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
@Bind("createdBeforeDate") final Date createdBeforeDate);
Long getCountByStateNameAcrossTenants(@Bind("stateName") final String stateName,
@Bind("createdBeforeDate") final Date createdBeforeDate);

@SqlQuery
Iterator<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
@Bind("createdBeforeDate") final Date createdBeforeDate,
@Bind("offset") final Long offset,
@Bind("rowCount") final Long rowCount);

}
Expand Up @@ -30,13 +30,13 @@

public interface PaymentDao {

public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit);
public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, final Long offset, final Long limit);

public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);

public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);

public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate);
public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate, final Long offset, final Long limit);

public List<PaymentAttemptModelDao> getPaymentAttempts(String paymentExternalKey, InternalTenantContext context);

Expand Down
Expand Up @@ -70,8 +70,8 @@ public List<PaymentModelDao> getPaymentsByStatesAcrossTenants(@StateCollectionBi
@SqlQuery
@SmartFetchSize(shouldStream = true)
public Iterator<PaymentModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
@Bind("offset") final Long offset,
@Bind("rowCount") final Long rowCount,
@Bind("offset") final Long offset,
@Bind("rowCount") final Long rowCount,
@BindBean final InternalTenantContext context);

@SqlQuery
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -52,10 +53,16 @@ List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(@Bind("tran
@BindBean final InternalTenantContext context);

@SqlQuery
List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
Long getCountByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
@Bind("createdBeforeDate") final Date createdBeforeDate,
@Bind("createdAfterDate") final Date createdAfterDate);

@SqlQuery
Iterator<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
@Bind("createdBeforeDate") final Date createdBeforeDate,
@Bind("createdAfterDate") final Date createdAfterDate,
@Bind("limit") final int limit);
@Bind("offset") final Long offset,
@Bind("rowCount") final Long rowCount);

@SqlQuery
public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
Expand Down
Expand Up @@ -73,6 +73,17 @@ where state_name = :stateName
and created_date \< :createdBeforeDate
<andCheckSoftDeletionWithComma("")>
<defaultOrderBy()>
limit :offset, :rowCount
;
>>

getCountByStateNameAcrossTenants() ::= <<
select
count(1) as count
from <tableName()>
where state_name = :stateName
and created_date \< :createdBeforeDate
<andCheckSoftDeletionWithComma("")>
;
>>

Expand Down
Expand Up @@ -90,7 +90,18 @@ created_date >= :createdAfterDate
and created_date \< :createdBeforeDate
and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
<defaultOrderBy()>
limit :limit
limit :offset, :rowCount
;
>>

getCountByTransactionStatusPriorDateAcrossTenants(statuses) ::= <<
select
count(1) as count
from <tableName()>
where
created_date >= :createdAfterDate
and created_date \< :createdBeforeDate
and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
;
>>

Expand Up @@ -36,6 +36,7 @@
import org.killbill.billing.dao.MockNonEntityDao;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
import org.killbill.billing.util.entity.DefaultPagination;
import org.killbill.billing.util.entity.Pagination;

import com.google.common.base.Predicate;
Expand Down Expand Up @@ -64,8 +65,8 @@ public void reset() {
}

@Override
public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit) {
return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, Long offset, Long limit) {
final List<PaymentTransactionModelDao> result= ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
@Override
public boolean apply(final PaymentTransactionModelDao input) {
return Iterables.any(transactionStatuses, new Predicate<TransactionStatus>() {
Expand All @@ -76,6 +77,7 @@ public boolean apply(final TransactionStatus transactionStatus) {
});
}
}));
return new DefaultPagination<PaymentTransactionModelDao>(new Long(result.size()), result.iterator());
}

@Override
Expand Down Expand Up @@ -108,7 +110,7 @@ public void updatePaymentAttempt(final UUID paymentAttemptId, final UUID transac
}

@Override
public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
return null;
}

Expand Down

0 comments on commit be2b6b8

Please sign in to comment.