Skip to content

Commit

Permalink
Cope with batched transactions in catchup.
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj authored and Mark Needham committed Nov 7, 2016
1 parent d79f4b0 commit 32b4a14
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 25 deletions.
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup;

public class TxPullRequestResult
{
private final CatchupResult catchupResult;
private final long lastTxId;

public TxPullRequestResult( CatchupResult catchupResult, long lastTxId )
{
this.catchupResult = catchupResult;
this.lastTxId = lastTxId;
}

public CatchupResult catchupResult()
{
return catchupResult;
}

public long lastTxId()
{
return lastTxId;
}
}
Expand Up @@ -24,6 +24,7 @@


import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchupResult; import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter;
import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.catchup.tx.TxPullClient;
Expand All @@ -35,6 +36,7 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_BATCH;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;


public class StoreFetcher public class StoreFetcher
Expand Down Expand Up @@ -68,19 +70,6 @@ public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File
return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId ); return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId );
} }


private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId ) throws IOException, StoreCopyFailedException
{
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
{
log.info( "Pulling transactions from: %d", fromTxId );
return txPullClient.pullTransactions( from, expectedStoreId, fromTxId, writer );
}
catch ( CatchUpClientException e )
{
throw new StoreCopyFailedException( e );
}
}

public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
throws StoreCopyFailedException, StreamingTransactionsFailedException throws StoreCopyFailedException, StreamingTransactionsFailedException
{ {
Expand Down Expand Up @@ -108,6 +97,31 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
} }
} }


private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId ) throws IOException, StoreCopyFailedException
{
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
{
log.info( "Pulling transactions from: %d", fromTxId );

long pullRequestTxId = fromTxId;

CatchupResult lastStatus;
do
{
TxPullRequestResult result = txPullClient.pullTransactions( from, expectedStoreId, pullRequestTxId, writer );
lastStatus = result.catchupResult();
pullRequestTxId = result.lastTxId();
}
while ( lastStatus == SUCCESS_END_OF_BATCH );

return lastStatus;
}
catch ( CatchUpClientException e )
{
throw new StoreCopyFailedException( e );
}
}

public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedException public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedException
{ {
return storeCopyClient.fetchStoreId( from ); return storeCopyClient.fetchStoreId( from );
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchupResult; import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
Expand All @@ -40,24 +40,29 @@ public TxPullClient( CatchUpClient catchUpClient, Monitors monitors )
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
} }


public CatchupResult pullTransactions( MemberId from, StoreId storeId, long startTxId, public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long startTxId,
TxPullResponseListener txPullResponseListener ) throws CatchUpClientException TxPullResponseListener txPullResponseListener )
throws CatchUpClientException
{ {
pullRequestMonitor.txPullRequest( startTxId ); pullRequestMonitor.txPullRequest( startTxId );
return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ), return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ),
new CatchUpResponseAdaptor<CatchupResult>() new CatchUpResponseAdaptor<TxPullRequestResult>()
{ {
private long lastTxIdReceived = startTxId;

@Override @Override
public void onTxPullResponse( CompletableFuture<CatchupResult> signal, TxPullResponse response ) public void onTxPullResponse( CompletableFuture<TxPullRequestResult> signal,
TxPullResponse response )
{ {
this.lastTxIdReceived = response.tx().getCommitEntry().getTxId();
txPullResponseListener.onTxReceived( response ); txPullResponseListener.onTxReceived( response );
} }


@Override @Override
public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal, public void onTxStreamFinishedResponse( CompletableFuture<TxPullRequestResult> signal,
TxStreamFinishedResponse response ) TxStreamFinishedResponse response )
{ {
signal.complete( response.status() ); signal.complete( new TxPullRequestResult(response.status(), lastTxIdReceived ));
} }
} ); } );
} }
Expand Down
Expand Up @@ -185,8 +185,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
BatchingTxApplier batchingTxApplier = new BatchingTxApplier( maxBatchSize, BatchingTxApplier batchingTxApplier = new BatchingTxApplier( maxBatchSize,
dependencies.provideDependency( TransactionIdStore.class ), dependencies.provideDependency( TransactionIdStore.class ),
writableCommitProcess, databaseHealthSupplier, platformModule.monitors, logProvider ); writableCommitProcess, databaseHealthSupplier, platformModule.monitors, logProvider );
// ContinuousJob txApplyJob = new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group(
// "tx-applier", NEW_THREAD ), batchingTxApplier, logProvider );


DelayedRenewableTimeoutService txPullerTimeoutService = DelayedRenewableTimeoutService txPullerTimeoutService =
new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ); new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider );
Expand Down Expand Up @@ -215,7 +213,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
dependencies.satisfyDependencies( txPuller ); dependencies.satisfyDependencies( txPuller );


txPulling.add( batchingTxApplier ); txPulling.add( batchingTxApplier );
// txPulling.add( txApplyJob );
txPulling.add( txPuller ); txPulling.add( txPuller );
txPulling.add( txPullerTimeoutService ); txPulling.add( txPullerTimeoutService );


Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.UUID; import java.util.UUID;


import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter;
import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.catchup.tx.TxPullClient;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void shouldCopyStoreFilesAndPullTransactions() throws Exception
StoreId storeId = new StoreId( 1, 2, 3, 4 ); StoreId storeId = new StoreId( 1, 2, 3, 4 );
StoreCopyClient storeCopyClient = mock( StoreCopyClient.class ); StoreCopyClient storeCopyClient = mock( StoreCopyClient.class );
TxPullClient txPullClient = mock( TxPullClient.class ); TxPullClient txPullClient = mock( TxPullClient.class );
when( txPullClient.pullTransactions( any(), any(), anyLong(), any() ) ).thenReturn( SUCCESS_END_OF_STREAM ); when( txPullClient.pullTransactions( any(), any(), anyLong(), any() ) ).thenReturn( new TxPullRequestResult( SUCCESS_END_OF_STREAM, 13) );
TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class ); TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );


StoreFetcher fetcher = new StoreFetcher( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ), StoreFetcher fetcher = new StoreFetcher( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
Expand Down Expand Up @@ -83,7 +84,7 @@ public void shouldSetLastPulledTransactionId() throws Exception


TxPullClient txPullClient = mock( TxPullClient.class ); TxPullClient txPullClient = mock( TxPullClient.class );
when( txPullClient.pullTransactions( eq( localhost ), eq( wantedStoreId ), anyLong(), any( TxPullResponseListener.class ) ) ) when( txPullClient.pullTransactions( eq( localhost ), eq( wantedStoreId ), anyLong(), any( TxPullResponseListener.class ) ) )
.thenReturn( SUCCESS_END_OF_STREAM ); .thenReturn( new TxPullRequestResult( SUCCESS_END_OF_STREAM, 13) );


TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class ); TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );


Expand Down

0 comments on commit 32b4a14

Please sign in to comment.