Skip to content

Commit

Permalink
Simplify the way we do transaction polling.
Browse files Browse the repository at this point in the history
We don't need to have multiple threads competing
to send a TX pull request. Instead lets just have
one thread.
  • Loading branch information
Mark Needham committed Nov 15, 2016
1 parent 5d1ebe6 commit 95d6bb0
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 82 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.catchup.tx;

import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;

import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
Expand All @@ -28,20 +29,23 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.function.Predicates;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

/**
* This class is responsible for pulling transactions from a core server and queuing
Expand All @@ -66,18 +70,33 @@ enum Timeouts implements TimeoutName
private final CopiedStoreRecovery copiedStoreRecovery;
private final CatchUpClient catchUpClient;
private final CoreMemberSelectionStrategy connectionStrategy;
private final RenewableTimeoutService timeoutService;
private final long txPullIntervalMillis;
private final BatchingTxApplier applier;
private final PullRequestMonitor pullRequestMonitor;

private RenewableTimeout timeout;
private volatile JobScheduler.JobHandle handle;
private final JobScheduler scheduler;
private volatile boolean stopped;
private volatile boolean polling;

public static final JobScheduler.Group txPolling = new JobScheduler.Group( "TxPolling", POOLED );

private final BooleanSupplier txPollingCondition = new BooleanSupplier()
{
@Override
public boolean getAsBoolean()
{
return !polling;
}
};


public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase,
Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient,
CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService,
long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors,
CopiedStoreRecovery copiedStoreRecovery )
Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient,
CoreMemberSelectionStrategy connectionStrategy,
long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors,
CopiedStoreRecovery copiedStoreRecovery,
JobScheduler scheduler )
{
this.fs = fs;
this.localDatabase = localDatabase;
Expand All @@ -86,45 +105,71 @@ public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, Local
this.storeFetcher = storeFetcher;
this.catchUpClient = catchUpClient;
this.connectionStrategy = connectionStrategy;
this.timeoutService = timeoutService;
this.txPullIntervalMillis = txPullIntervalMillis;
this.applier = applier;
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
this.copiedStoreRecovery = copiedStoreRecovery;

this.scheduler = scheduler;
}

@Override
public synchronized void start() throws Throwable
{
timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() );
handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS );
}

/**
* Time to pull!
*/
private synchronized void onTimeout()
@Override
public void stop() throws Throwable
{
timeout.renew();
applier.emptyQueueAndResetLastQueuedTxId();
stopped = true;
if ( handle != null )
{
handle.cancel( false );
}
}

try
private final Runnable job = new Runnable()
{
@Override
public void run()
{
MemberId core = connectionStrategy.coreMember();
StoreId localStoreId = localDatabase.storeId();
try
{
polling = true;
if ( stopped )
{
return;
}

boolean moreToPull = true;
int batchCount = 1;
while ( moreToPull )
applier.emptyQueueAndResetLastQueuedTxId();
MemberId core = connectionStrategy.coreMember();
StoreId localStoreId = localDatabase.storeId();

boolean moreToPull = true;
int batchCount = 1;
while ( moreToPull )
{
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount );
batchCount++;
}
}
catch ( Throwable e )
{
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount );
batchCount++;
log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e );
}
finally
{
polling = false;
}

// reschedule only if it is not stopped
if ( !stopped )
{
handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS );
}
}
catch ( Throwable e )
{
log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e );
}
}
};

private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable
{
Expand All @@ -139,9 +184,6 @@ private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localSto
public void onTxPullResponse( CompletableFuture<CatchupResult> signal, TxPullResponse response )
{
applier.queue( response.tx() );
// no applying here, just put it in the batch

timeout.renew();
}

@Override
Expand All @@ -167,37 +209,19 @@ public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
downloadDatabase( core, localStoreId );
return false;
default:
log.info( "Tx pull unable to get transactions > %d " + lastQueuedTxId );
log.info( "Tx pull unable to get transactions > %d ", lastQueuedTxId );
return false;
}
}

private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Throwable
{
pause();
try
{
localDatabase.stop();
startStopOnStoreCopy.stop();
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ).
copyWholeStoreFrom( core, localStoreId, storeFetcher );
localDatabase.start();
startStopOnStoreCopy.start();
applier.refreshFromNewStore();
}
finally
{
resume();
}
}

public void pause()
{
timeout.cancel();
}

public void resume()
{
timeout.renew();
}
}
Expand Up @@ -81,6 +81,7 @@
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.DefaultKernelData;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -235,9 +236,9 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

TxPollingClient txPuller =
new TxPollingClient( logProvider, fileSystem, localDatabase, servicesToStopOnStoreCopy, storeFetcher,
catchUpClient, new ConnectToRandomCoreMember( discoveryService ), txPullerTimeoutService,
catchUpClient, new ConnectToRandomCoreMember( discoveryService ),
config.get( CausalClusteringSettings.pull_interval ), batchingTxApplier,
platformModule.monitors, copiedStoreRecovery );
platformModule.monitors, copiedStoreRecovery, platformModule.jobScheduler );

dependencies.satisfyDependencies( txPuller );

Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.mockito.ArgumentCaptor;

import java.io.File;
import java.util.concurrent.TimeUnit;

import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
Expand All @@ -38,18 +39,25 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT;
import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.txPolling;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint;

public class TxPollingClientTest
{
Expand All @@ -67,15 +75,17 @@ public class TxPollingClientTest
private final CopiedStoreRecovery copiedStoreRecovery = mock( CopiedStoreRecovery.class );
private final StoreId storeId = new StoreId( 1, 2, 3, 4 );
private final LocalDatabase localDatabase = mock( LocalDatabase.class );

{
when( localDatabase.storeId() ).thenReturn( storeId );
}
private final Lifecycle startStopOnStoreCopy = mock( Lifecycle.class );

private final OnDemandJobScheduler scheduler = spy( new OnDemandJobScheduler());
private final TxPollingClient txPuller =
new TxPollingClient( NullLogProvider.getInstance(), fs, localDatabase, startStopOnStoreCopy, storeFetcher,
catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors(),
copiedStoreRecovery );
catchUpClient, serverSelection, txPullIntervalMillis, txApplier, new Monitors(),
copiedStoreRecovery, scheduler );

@Before
public void before() throws Throwable
Expand All @@ -93,7 +103,7 @@ public void shouldSendPullRequestOnTick() throws Throwable
when( txApplier.lastQueuedTxId() ).thenReturn( lastAppliedTxId );

// when
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );
scheduler.runJob();

// then
verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
Expand All @@ -112,38 +122,27 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable
any( CatchUpResponseCallback.class ) ))
.thenReturn( CatchupResult.SUCCESS_END_OF_BATCH, CatchupResult.SUCCESS_END_OF_STREAM );

timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );
scheduler.runJob();

// then
verify( catchUpClient, times( 2 ) ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
any( CatchUpResponseCallback.class ) );
}

@Test
public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable
public void shouldRescheduleTheJobAfterARun() throws Throwable
{
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );

StoreId storeId = new StoreId( 1, 2, 3, 4 );
ArgumentCaptor<CatchUpResponseCallback> captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class );

verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
captor.capture() );

captor.getValue().onTxPullResponse( null,
new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) );

verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), times( 2 ) ).renew();
}
// given
Runnable scheduledJob = scheduler.getJob();
assertNotNull( scheduledJob );

@Test
public void shouldRenewTxPullTimeoutOnTick() throws Throwable
{
// when
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );
scheduler.runJob();

// then
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew();
verify( scheduler, times( 2 ) ).schedule( eq( txPolling ), any( Runnable.class ),
eq( 100L ), eq( TimeUnit.MILLISECONDS ) );
assertEquals( scheduledJob, scheduler.getJob() );
}

@Test
Expand All @@ -154,17 +153,14 @@ public void shouldCopyStoreIfCatchUpClientFails() throws Throwable
any( CatchUpResponseCallback.class ) ) ).thenReturn( CatchupResult.E_TRANSACTION_PRUNED );

// when
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );
scheduler.runJob();;

// then
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).cancel();
verify( localDatabase ).stop();
verify( startStopOnStoreCopy ).stop();
verify( storeFetcher ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) );
verify( localDatabase ).start();
verify( startStopOnStoreCopy ).start();
verify( txApplier ).refreshFromNewStore();
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ),
times( 2 ) /* at the beginning and after store copy */ ).renew();
}
}
Expand Up @@ -70,7 +70,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th
ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database();
TxPollingClient pollingClient = readReplicaGraphDatabase.getDependencyResolver()
.resolveDependency( TxPollingClient.class );
pollingClient.pause();
// pollingClient.pause();

cluster.coreTx( ( coreGraphDatabase, transaction ) -> {
coreGraphDatabase.createNode();
Expand All @@ -92,7 +92,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th
}

// when the poller is resumed, it does make it to the read replica
pollingClient.resume();
// pollingClient.resume();
transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
}

Expand Down

0 comments on commit 95d6bb0

Please sign in to comment.