Skip to content

Commit

Permalink
Send only a batch of transactions in response to each pull request
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj authored and Mark Needham committed Nov 5, 2016
1 parent d09fe67 commit d79f4b0
Show file tree
Hide file tree
Showing 17 changed files with 233 additions and 175 deletions.
Expand Up @@ -21,7 +21,8 @@

public enum CatchupResult
{
SUCCESS,
SUCCESS_END_OF_BATCH,
SUCCESS_END_OF_STREAM,
E_STORE_ID_MISMATCH,
E_STORE_UNAVAILABLE,
E_TRANSACTION_PRUNED
Expand Down
Expand Up @@ -60,7 +60,6 @@
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.NeoStoreDataSource;
Expand All @@ -75,7 +74,6 @@

public class CatchupServer extends LifecycleAdapter
{
private static final Setting<ListenSocketAddress> setting = CausalClusteringSettings.transaction_listen_address;
private final LogProvider logProvider;
private final Log log;
private final Log userLog;
Expand All @@ -94,6 +92,7 @@ public class CatchupServer extends LifecycleAdapter
private EventLoopGroup workerGroup;
private Channel channel;
private Supplier<CheckPointer> checkPointerSupplier;
private int txPullBatchSize;

public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier,
Expand All @@ -102,7 +101,8 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
CoreState coreState, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier )
{
this.coreState = coreState;
this.listenAddress = config.get( setting );
this.listenAddress = config.get( CausalClusteringSettings.transaction_listen_address );
this.txPullBatchSize = config.get( CausalClusteringSettings.tx_pull_batch_size );
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.storeIdSupplier = storeIdSupplier;
this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier;
Expand Down Expand Up @@ -159,8 +159,8 @@ protected void initChannel( SocketChannel ch )

pipeline.addLast(
new TxPullRequestHandler( protocol, storeIdSupplier, dataSourceAvailabilitySupplier,
transactionIdStoreSupplier, logicalTransactionStoreSupplier, monitors,
logProvider ) );
transactionIdStoreSupplier, logicalTransactionStoreSupplier, txPullBatchSize,
monitors, logProvider ) );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
checkPointerSupplier ) );
Expand All @@ -185,8 +185,8 @@ protected void initChannel( SocketChannel ch )
//noinspection ConstantConditions
if ( e instanceof BindException )
{
userLog.error( "Address is already bound for setting: " + setting + " with value: " + listenAddress );
log.error( "Address is already bound for setting: " + setting + " with value: " + listenAddress, e );
userLog.error( "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + listenAddress );
log.error( "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + listenAddress, e );
throw e;
}
}
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;

public class StoreFetcher
{
Expand Down Expand Up @@ -97,7 +97,7 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
log.info( "Store files need to be recovered starting from: %d", pullTxIndex );

CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, pullTxIndex );
if ( catchupResult != SUCCESS )
if ( catchupResult != SUCCESS_END_OF_STREAM )
{
throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult );
}
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.causalclustering.catchup.tx;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.kernel.impl.api.TransactionCommitProcess;
Expand All @@ -34,15 +32,13 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.neo4j.function.Predicates.awaitForever;
import static org.neo4j.kernel.impl.transaction.tracing.CommitEvent.NULL;
import static org.neo4j.storageengine.api.TransactionApplicationMode.EXTERNAL;

/**
* Accepts transactions and queues them up for being applied in batches.
*/
public class BatchingTxApplier extends LifecycleAdapter implements Runnable
public class BatchingTxApplier extends LifecycleAdapter
{
private final int maxBatchSize;
private final Supplier<TransactionIdStore> txIdStoreSupplier;
Expand All @@ -52,13 +48,10 @@ public class BatchingTxApplier extends LifecycleAdapter implements Runnable
private final PullRequestMonitor monitor;
private final Log log;

private final ArrayBlockingQueue<CommittedTransactionRepresentation> txQueue;

private TransactionQueue txBatcher;
private TransactionQueue txQueue;
private TransactionCommitProcess commitProcess;

private volatile long lastQueuedTxId;
private volatile long lastAppliedTxId;
private volatile boolean stopped;

public BatchingTxApplier( int maxBatchSize, Supplier<TransactionIdStore> txIdStoreSupplier,
Expand All @@ -71,15 +64,14 @@ public BatchingTxApplier( int maxBatchSize, Supplier<TransactionIdStore> txIdSto
this.healthSupplier = healthSupplier;
this.log = logProvider.getLog( getClass() );
this.monitor = monitors.newMonitor( PullRequestMonitor.class );
this.txQueue = new ArrayBlockingQueue<>( maxBatchSize );
}

@Override
public void start()
{
stopped = false;
refreshFromNewStore();
txBatcher =
txQueue =
new TransactionQueue( maxBatchSize, ( first, last ) -> commitProcess.commit( first, NULL, EXTERNAL ) );
}

Expand All @@ -91,9 +83,8 @@ public void stop()

void refreshFromNewStore()
{
assert txQueue.isEmpty();
assert txBatcher == null || txBatcher.isEmpty();
lastQueuedTxId = lastAppliedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId();
assert txQueue == null || txQueue.isEmpty();
lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId();
commitProcess = commitProcessSupplier.get();
}

Expand All @@ -109,68 +100,45 @@ public void queue( CommittedTransactionRepresentation tx )

if ( receivedTxId != expectedTxId )
{
log.warn( "[" + Thread.currentThread() + "] Out of order transaction. Received: " + receivedTxId +
" Expected: " + expectedTxId );
log.warn( "Out of order transaction. Received: %d Expected: %d", receivedTxId, expectedTxId );
return;
}

awaitForever( () -> stopped || txQueue.offer( tx ), 100, TimeUnit.MILLISECONDS );
try
{
txQueue.queue( new TransactionToApply( tx.getTransactionRepresentation(), receivedTxId ) );
}
catch ( Exception e )
{
log.error( "Error while queueing transaction", e );
healthSupplier.get().panic( e );
}

if ( !stopped )
{
lastQueuedTxId = receivedTxId;
monitor.txPullResponse( receivedTxId );
}
}

@Override
public void run()
void applyBatch()
{
CommittedTransactionRepresentation tx = null;
try
{
tx = txQueue.poll( 1, SECONDS );
txQueue.empty();
}
catch ( InterruptedException e )
catch ( Exception e )
{
log.warn( "Not expecting to be interrupted", e );
log.error( "Error during transaction application", e );
healthSupplier.get().panic( e );
}

if ( tx != null )
{
long txId;
try
{
do
{
txId = tx.getCommitEntry().getTxId();
txBatcher.queue( new TransactionToApply( tx.getTransactionRepresentation(), txId ) );
}
while ( (tx = txQueue.poll()) != null );

txBatcher.empty();
lastAppliedTxId = txId;
}
catch ( Exception e )
{
log.error( "Error during transaction application", e );
healthSupplier.get().panic( e );
}
}
}

/**
* @return True if there is pending work, and false otherwise.
*/
boolean workPending()
{
return lastQueuedTxId > lastAppliedTxId;
}

/**
* @return The id of the last transaction applied.
*/
long lastAppliedTxId()
long lastQueuedTxId()
{
return lastAppliedTxId;
return lastQueuedTxId;
}
}
Expand Up @@ -30,10 +30,10 @@
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -101,52 +101,16 @@ public synchronized void start() throws Throwable
private synchronized void onTimeout()
{
timeout.renew();
if ( applier.workPending() )
{
log.info( "Still applying old batch, delay pulling to the next interval. Up to tx %d",
applier.lastAppliedTxId() );
return;
}

try
{
MemberId core = connectionStrategy.coreMember();
long lastAppliedTxId = applier.lastAppliedTxId();
pullRequestMonitor.txPullRequest( lastAppliedTxId );
StoreId localStoreId = localDatabase.storeId();
TxPullRequest txPullRequest = new TxPullRequest( lastAppliedTxId, localStoreId );
log.info( "[" + Thread.currentThread() + "] Starting transaction pull from " + lastAppliedTxId );
CatchupResult catchupResult =
catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor<CatchupResult>()
{
@Override
public void onTxPullResponse( CompletableFuture<CatchupResult> signal, TxPullResponse response )
{
applier.queue( response.tx() );
timeout.renew();
}

@Override
public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
TxStreamFinishedResponse response )
{
signal.complete( response.status() );
}
} );

switch ( catchupResult )

boolean moreToPull = true;
while ( moreToPull )
{
case SUCCESS:
log.debug( "[" + Thread.currentThread() + "] Successfully completed transaction pull from " + lastAppliedTxId );
break;
case E_TRANSACTION_PRUNED:
log.info( "[" + Thread.currentThread() + "] Tx pull unable to get transactions starting from " + lastAppliedTxId +
" such transaction have been pruned. Attempting a store copy." );
downloadDatabase( core, localStoreId );
break;
default:
log.info( "[" + Thread.currentThread() + "] Tx pull unable to get transactions starting from " + lastAppliedTxId );
break;
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId );
}
}
catch ( Throwable e )
Expand All @@ -155,6 +119,52 @@ public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
}
}

private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId ) throws Throwable
{
long lastQueuedTxId = applier.lastQueuedTxId();
pullRequestMonitor.txPullRequest( lastQueuedTxId );
TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId );
log.info( "Pull transactions where tx id > %d", lastQueuedTxId );
CatchupResult catchupResult =
catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor<CatchupResult>()
{
@Override
public void onTxPullResponse( CompletableFuture<CatchupResult> signal, TxPullResponse response )
{
applier.queue( response.tx() );
// no applying here, just put it in the batch

timeout.renew();
}

@Override
public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
TxStreamFinishedResponse response )
{
// apply the batch here
applier.applyBatch();
signal.complete( response.status() );
}
} );

switch ( catchupResult )
{
case SUCCESS_END_OF_BATCH:
return true;
case SUCCESS_END_OF_STREAM:
log.debug( "Successfully pulled transactions from %d", lastQueuedTxId );
return false;
case E_TRANSACTION_PRUNED:
log.info( "Tx pull unable to get transactions starting from %d since transactions " +
"have been pruned. Attempting a store copy.", lastQueuedTxId ) ;
downloadDatabase( core, localStoreId );
return false;
default:
log.info( "Tx pull unable to get transactions > %d " + lastQueuedTxId );
return false;
}
}

private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Throwable
{
pause();
Expand Down

0 comments on commit d79f4b0

Please sign in to comment.