Skip to content

Commit

Permalink
Revert "Simplify the way we do transaction polling."
Browse files Browse the repository at this point in the history
This reverts commit 95d6bb0.
  • Loading branch information
Mark Needham committed Nov 15, 2016
1 parent 11ccf32 commit e6fb7aa
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 103 deletions.
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.causalclustering.catchup.tx;

import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;

import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
Expand All @@ -29,23 +28,20 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.function.Predicates;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;
import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT;

/**
* This class is responsible for pulling transactions from a core server and queuing
Expand All @@ -70,33 +66,18 @@ enum Timeouts implements TimeoutName
private final CopiedStoreRecovery copiedStoreRecovery;
private final CatchUpClient catchUpClient;
private final CoreMemberSelectionStrategy connectionStrategy;
private final RenewableTimeoutService timeoutService;
private final long txPullIntervalMillis;
private final BatchingTxApplier applier;
private final PullRequestMonitor pullRequestMonitor;

private volatile JobScheduler.JobHandle handle;
private final JobScheduler scheduler;
private volatile boolean stopped;
private volatile boolean polling;

public static final JobScheduler.Group txPolling = new JobScheduler.Group( "TxPolling", POOLED );

private final BooleanSupplier txPollingCondition = new BooleanSupplier()
{
@Override
public boolean getAsBoolean()
{
return !polling;
}
};

private RenewableTimeout timeout;

public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase,
Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient,
CoreMemberSelectionStrategy connectionStrategy,
long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors,
CopiedStoreRecovery copiedStoreRecovery,
JobScheduler scheduler )
Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient,
CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService,
long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors,
CopiedStoreRecovery copiedStoreRecovery )
{
this.fs = fs;
this.localDatabase = localDatabase;
Expand All @@ -105,71 +86,45 @@ public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, Local
this.storeFetcher = storeFetcher;
this.catchUpClient = catchUpClient;
this.connectionStrategy = connectionStrategy;
this.timeoutService = timeoutService;
this.txPullIntervalMillis = txPullIntervalMillis;
this.applier = applier;
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
this.copiedStoreRecovery = copiedStoreRecovery;

this.scheduler = scheduler;
}

@Override
public synchronized void start() throws Throwable
{
handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS );
timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() );
}

@Override
public void stop() throws Throwable
/**
* Time to pull!
*/
private synchronized void onTimeout()
{
stopped = true;
if ( handle != null )
{
handle.cancel( false );
}
}
timeout.renew();
applier.emptyQueueAndResetLastQueuedTxId();

private final Runnable job = new Runnable()
{
@Override
public void run()
try
{
try
{
polling = true;
if ( stopped )
{
return;
}
MemberId core = connectionStrategy.coreMember();
StoreId localStoreId = localDatabase.storeId();

applier.emptyQueueAndResetLastQueuedTxId();
MemberId core = connectionStrategy.coreMember();
StoreId localStoreId = localDatabase.storeId();

boolean moreToPull = true;
int batchCount = 1;
while ( moreToPull )
{
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount );
batchCount++;
}
}
catch ( Throwable e )
boolean moreToPull = true;
int batchCount = 1;
while ( moreToPull )
{
log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e );
}
finally
{
polling = false;
}

// reschedule only if it is not stopped
if ( !stopped )
{
handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS );
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount );
batchCount++;
}
}
};
catch ( Throwable e )
{
log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e );
}
}

private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable
{
Expand All @@ -184,6 +139,9 @@ private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localSto
public void onTxPullResponse( CompletableFuture<CatchupResult> signal, TxPullResponse response )
{
applier.queue( response.tx() );
// no applying here, just put it in the batch

timeout.renew();
}

@Override
Expand All @@ -209,19 +167,37 @@ public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
downloadDatabase( core, localStoreId );
return false;
default:
log.info( "Tx pull unable to get transactions > %d ", lastQueuedTxId );
log.info( "Tx pull unable to get transactions > %d " + lastQueuedTxId );
return false;
}
}

private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Throwable
{
pause();
try
{
localDatabase.stop();
startStopOnStoreCopy.stop();
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ).
copyWholeStoreFrom( core, localStoreId, storeFetcher );
localDatabase.start();
startStopOnStoreCopy.start();
applier.refreshFromNewStore();
}
finally
{
resume();
}
}

public void pause()
{
timeout.cancel();
}

public void resume()
{
timeout.renew();
}
}
Expand Up @@ -81,7 +81,6 @@
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.DefaultKernelData;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -236,9 +235,9 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

TxPollingClient txPuller =
new TxPollingClient( logProvider, fileSystem, localDatabase, servicesToStopOnStoreCopy, storeFetcher,
catchUpClient, new ConnectToRandomCoreMember( discoveryService ),
catchUpClient, new ConnectToRandomCoreMember( discoveryService ), txPullerTimeoutService,
config.get( CausalClusteringSettings.pull_interval ), batchingTxApplier,
platformModule.monitors, copiedStoreRecovery, platformModule.jobScheduler );
platformModule.monitors, copiedStoreRecovery );

dependencies.satisfyDependencies( txPuller );

Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.mockito.ArgumentCaptor;

import java.io.File;
import java.util.concurrent.TimeUnit;

import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
Expand All @@ -39,25 +38,18 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT;
import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.txPolling;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint;

public class TxPollingClientTest
{
Expand All @@ -75,17 +67,15 @@ public class TxPollingClientTest
private final CopiedStoreRecovery copiedStoreRecovery = mock( CopiedStoreRecovery.class );
private final StoreId storeId = new StoreId( 1, 2, 3, 4 );
private final LocalDatabase localDatabase = mock( LocalDatabase.class );

{
when( localDatabase.storeId() ).thenReturn( storeId );
}
private final Lifecycle startStopOnStoreCopy = mock( Lifecycle.class );

private final OnDemandJobScheduler scheduler = spy( new OnDemandJobScheduler());
private final TxPollingClient txPuller =
new TxPollingClient( NullLogProvider.getInstance(), fs, localDatabase, startStopOnStoreCopy, storeFetcher,
catchUpClient, serverSelection, txPullIntervalMillis, txApplier, new Monitors(),
copiedStoreRecovery, scheduler );
catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors(),
copiedStoreRecovery );

@Before
public void before() throws Throwable
Expand All @@ -103,7 +93,7 @@ public void shouldSendPullRequestOnTick() throws Throwable
when( txApplier.lastQueuedTxId() ).thenReturn( lastAppliedTxId );

// when
scheduler.runJob();
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );

// then
verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
Expand All @@ -122,27 +112,38 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable
any( CatchUpResponseCallback.class ) ))
.thenReturn( CatchupResult.SUCCESS_END_OF_BATCH, CatchupResult.SUCCESS_END_OF_STREAM );

scheduler.runJob();
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );

// then
verify( catchUpClient, times( 2 ) ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
any( CatchUpResponseCallback.class ) );
}

@Test
public void shouldRescheduleTheJobAfterARun() throws Throwable
public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable
{
// given
Runnable scheduledJob = scheduler.getJob();
assertNotNull( scheduledJob );
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );

StoreId storeId = new StoreId( 1, 2, 3, 4 );
ArgumentCaptor<CatchUpResponseCallback> captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class );

verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
captor.capture() );

captor.getValue().onTxPullResponse( null,
new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) );

verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), times( 2 ) ).renew();
}

@Test
public void shouldRenewTxPullTimeoutOnTick() throws Throwable
{
// when
scheduler.runJob();
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );

// then
verify( scheduler, times( 2 ) ).schedule( eq( txPolling ), any( Runnable.class ),
eq( 100L ), eq( TimeUnit.MILLISECONDS ) );
assertEquals( scheduledJob, scheduler.getJob() );
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew();
}

@Test
Expand All @@ -153,14 +154,17 @@ public void shouldCopyStoreIfCatchUpClientFails() throws Throwable
any( CatchUpResponseCallback.class ) ) ).thenReturn( CatchupResult.E_TRANSACTION_PRUNED );

// when
scheduler.runJob();;
timeoutService.invokeTimeout( TX_PULLER_TIMEOUT );

// then
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).cancel();
verify( localDatabase ).stop();
verify( startStopOnStoreCopy ).stop();
verify( storeFetcher ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) );
verify( localDatabase ).start();
verify( startStopOnStoreCopy ).start();
verify( txApplier ).refreshFromNewStore();
verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ),
times( 2 ) /* at the beginning and after store copy */ ).renew();
}
}
Expand Up @@ -70,7 +70,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th
ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database();
TxPollingClient pollingClient = readReplicaGraphDatabase.getDependencyResolver()
.resolveDependency( TxPollingClient.class );
// pollingClient.pause();
pollingClient.pause();

cluster.coreTx( ( coreGraphDatabase, transaction ) -> {
coreGraphDatabase.createNode();
Expand All @@ -92,7 +92,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th
}

// when the poller is resumed, it does make it to the read replica
// pollingClient.resume();
pollingClient.resume();
transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) );
}

Expand Down

0 comments on commit e6fb7aa

Please sign in to comment.