Skip to content

SequentialTransactionsCoordinator

James Hu edited this page Mar 2, 2016 · 12 revisions

Background

There are occasions that we want to consume or process a series of data items/messages/events. When the number of data items/messages/events gets bigger, we have to process then in small chunks/batches, and the possibility of processing failure increases. Therefore, we need to have a mechanism to ensure that:

  • We should be able to divide the data stream into smaller batches
  • Each data item in the stream must be included in one and only one batch
  • Multiple threads/servers should be able to work on different batches on the same data stream at the same time
  • Batch processing should be transactional, allowing the processing to start, abort, finish, and time out. All data items in a batch will always be in the same batch and will always have the same transactions state (started/in-progress/aborted/timed-out/finished)
  • Failed batches will be retried
  • States of the past transactional batches should be query-able

Concepts

The diagram below shows the basic concepts:

Sequential Transactions

Data items in the stream are allocated to 6 batches/transactions, they are numbered from #132 to #137. They are called SequentialTransactions because they form a sequence without any gap and their order never change. Transactions don't need to cover the same number of data items. For example #136 has more data items than others.

Once created, the start and end position of a transaction cannot change. There is an exception, for the last transaction, it can be created with an open end position. The end position of the last transaction can be changed from open to a specific position, however once it has been done the end position cannot be changed back to open again.

New transactions are always created after the last one. When the last transaction has an open end position, no new transaction can be created because there is no way to determine the start position of the new transaction.

Transactions are independent from each other. Since multiple threads/nodes can work on different transactions at the same time, this mechanism is better than traditional "check point" mechanism which keeps track of only one check point per data stream.

SequentialTransactionsCoordinator

The SequentialTransactionsCoordinator interface is an abstraction of the mechanism described above. Currently there are two implementations:

  • InMemSequentialTransactionsCoordinator - A single VM implementation that keeps everything in memory. It is good for testing/PoC
  • AzureSequentialTransactionsCoordinator - An implementation using Microsoft Azure Table Storage for storage and concurrency control. It is good for production usage.

Starting transactions

The most important methods in SequentialTransactionsCoordinator are the startTransaction(...) methods. A processor can call it to try to get either a failed transaction for retrying or information about the last transaction for starting a new one in next call. If there is any, a previously failed transaction will be re-started and returned. If there is no failed transaction available but it is possible to start a new transaction, information about the last transaction will be returned. Otherwise, null will be returned.

If information about the last transaction is returned, the processor can use the end position of the last transaction to determine the start position of a new transaction, then call startTransaction(...) method again with relevant information to request the creation and starting of the new transaction. And then the startTransaction(...) method may still return a previously failed and restarted transaction if there is any available, or return the newly created and started transaction, or null if another processor had won the competition to create the new transaction.

Below is an example showing how to start transactions:

while(state.get() == STATE_RUNNING){
  long startTime = System.currentTimeMillis();
  int attempts = 0;
  SequentialTransaction transaction = null;
  try{
    do{
      attempts++;
      try {
        transaction = txCoordinator.startTransaction(seriesId, processorId, 
            processorOptions.getInitialTransactionTimeoutDuration(), 
            processorOptions.getMaxInProgressTransactions(), processorOptions.getMaxRetringTransactions());
      } catch (Exception e) {
        logger.warn("[{}] startTransaction(...) failed", seriesId, e);
        await();
      }
    }while(transaction == null && state.get() == STATE_RUNNING);
    
    // got information about the last transaction, try to start a new one after that
    while (transaction != null && !transaction.hasStarted() && state.get() == STATE_RUNNING){
      String previousTransactionId = transaction.getTransactionId();
      String previousEndPosition = transaction.getStartPosition();
      transaction.setTransactionId(null);
      String startPosition = null;
      if (previousEndPosition == null){
        startPosition = START_POSITION;
      }else{
        startPosition = supplier.nextStartPosition(previousEndPosition);
      }
      if (!supplier.isInRange(startPosition, END_POSITION)){
        transaction = null;
        break;
      }
      transaction.setStartPosition(startPosition.toString());
      transaction.setEndPositionNull();  // for an open range transaction
      transaction.setTimeout(processorOptions.getInitialTransactionTimeoutDuration());
      attempts++;
      transaction = txCoordinator.startTransaction(seriesId, previousTransactionId, previousEndPosition, transaction, 
          processorOptions.getMaxInProgressTransactions(), processorOptions.getMaxRetringTransactions());
    }
  }catch(TransactionStorageInfrastructureException e){
    logger.debug("[{}] In transaction storage infrastructure error happened", seriesId, e);
  }catch(Exception e){
    logger.error("[{}] Error happened", seriesId, e);
  }
  
  if (transaction != null && transaction.hasStarted() && state.get() == STATE_RUNNING){
    logger.debug("Got a {} transaction {} [{}-{}] after {} attempts: {}", 
        (transaction.getAttempts() == 1 ? "new" : "failed"),
        transaction.getTransactionId(),
        transaction.getStartPosition(), transaction.getEndPosition(),
        attempts,
        DurationFormatter.formatSince(startTime));
    doTransaction(supplier, transaction);  // in this method, finish or abort the transaction
  }else{
    await();  // no work to do, rest for a while
  }
}  // state.get() == STATE_RUNNING

Performing transactions

Once you got a transaction from startTransaction(...) method and transaction != null && transaction.hasStarted(), congratulations, you are now the owner of this transaction!

What you need to keep in mind is that every transaction has a time out. The initial time out value is passed in as an argument to the startTransaction(...) method. Transaction time out can be renewed after the transaction started. This is useful in the case that the time required for finishing a transaction cannot be accurately estimated before actually starting the processing. So if the processing is running out of time, remember to call the renewTransactionTimeout(...) method.

When the work is done, you can call finishTransaction(...) method to finish the transaction. Or, if anything unexpected happened, you can call abortTransaction(...) to abort the transaction. The aborted transaction will be picked up for retry next time.

You can update the transaction detail field with updateTransaction(...) methods if you like. The detail field is owned by the application, so you can do what ever you want to that field.

Transaction also has a getAttempts() method for you to know whether you are working on a first-hand transaction or a n-hand one.