Skip to content

Commit

Permalink
Don't send back a TX that the requester already has
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Sep 16, 2016
1 parent ec1c53d commit bcc9157
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 22 deletions.
Expand Up @@ -89,12 +89,13 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) th
log.info( "Copying store from %s", from ); log.info( "Copying store from %s", from );
long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) ); long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) );


/* Strictly we do not require lastFlushedTxId, but we do not know if a later one exists // We require at least one transaction for extracting the log index of the consensus log.
* and we require at least one transaction usually, e.g. for extracting the log index // Given there might not have been any activity on the source server we need to ask for the
* of the consensus log. */ // log entry for the lastFlushedTxId even though we've already applied its contents
log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId ); long pullTxIndex = lastFlushedTxId - 1;
log.info( "Store files need to be recovered starting from: %d", pullTxIndex );


CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, lastFlushedTxId ); CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, pullTxIndex );
if ( catchupResult != SUCCESS ) if ( catchupResult != SUCCESS )
{ {
throw new StoreCopyFailedException( "Failed to pull transactions: " + catchupResult ); throw new StoreCopyFailedException( "Failed to pull transactions: " + catchupResult );
Expand Down
Expand Up @@ -27,18 +27,21 @@


public class TxPullRequest implements CatchUpRequest public class TxPullRequest implements CatchUpRequest
{ {
private long txId; private long previousTxId;
private final StoreId expectedStoreId; private final StoreId expectedStoreId;


public TxPullRequest( long txId, StoreId expectedStoreId ) public TxPullRequest( long previousTxId, StoreId expectedStoreId )
{ {
this.txId = txId; this.previousTxId = previousTxId;
this.expectedStoreId = expectedStoreId; this.expectedStoreId = expectedStoreId;
} }


public long txId() /**
* Request is for transactions after this id
*/
public long previousTxId()
{ {
return txId; return previousTxId;
} }


public StoreId expectedStoreId() public StoreId expectedStoreId()
Expand All @@ -58,19 +61,19 @@ public boolean equals( Object o )
return false; return false;
} }
TxPullRequest that = (TxPullRequest) o; TxPullRequest that = (TxPullRequest) o;
return txId == that.txId && Objects.equals( expectedStoreId, that.expectedStoreId ); return previousTxId == that.previousTxId && Objects.equals( expectedStoreId, that.expectedStoreId );
} }


@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash( txId, expectedStoreId ); return Objects.hash( previousTxId, expectedStoreId );
} }


@Override @Override
public String toString() public String toString()
{ {
return String.format( "TxPullRequest{txId=%d, storeId=%s}", txId, expectedStoreId ); return String.format( "TxPullRequest{txId=%d, storeId=%s}", previousTxId, expectedStoreId );
} }


@Override @Override
Expand Down
Expand Up @@ -34,7 +34,7 @@ public class TxPullRequestEncoder extends MessageToMessageEncoder<TxPullRequest>
protected void encode( ChannelHandlerContext ctx, TxPullRequest request, List<Object> out ) throws Exception protected void encode( ChannelHandlerContext ctx, TxPullRequest request, List<Object> out ) throws Exception
{ {
ByteBuf encoded = ctx.alloc().buffer(); ByteBuf encoded = ctx.alloc().buffer();
encoded.writeLong( request.txId() ); encoded.writeLong( request.previousTxId() );
StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( encoded ) ); StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( encoded ) );
out.add( encoded ); out.add( encoded );
} }
Expand Down
Expand Up @@ -69,7 +69,7 @@ public TxPullRequestHandler( CatchupServerProtocol protocol,
@Override @Override
protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg ) throws Exception protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg ) throws Exception
{ {
long firstTxId = Math.max( msg.txId(), BASE_TX_ID + 1 ); long firstTxId = Math.max( msg.previousTxId(), BASE_TX_ID ) + 1;
long lastTxId = firstTxId; long lastTxId = firstTxId;
CatchupResult status = SUCCESS; CatchupResult status = SUCCESS;
StoreId localStoreId = storeIdSupplier.get(); StoreId localStoreId = storeIdSupplier.get();
Expand Down
Expand Up @@ -94,7 +94,7 @@ public void shouldSetLastPulledTransactionId() throws Exception
fetcher.copyStore( localhost, wantedStoreId, new File( "destination" ) ); fetcher.copyStore( localhost, wantedStoreId, new File( "destination" ) );


// then // then
verify( txPullClient ).pullTransactions( eq( localhost ), eq( wantedStoreId ), eq( lastFlushedTxId ), any( TxPullResponseListener.class ) ); verify( txPullClient ).pullTransactions( eq( localhost ), eq( wantedStoreId ), eq( lastFlushedTxId - 1 ), any( TxPullResponseListener.class ) );
} }


@Test @Test
Expand Down
Expand Up @@ -62,8 +62,7 @@ public void shouldRespondWithStreamOfTransactions() throws Exception
when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L );


LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class ); LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class );
when( logicalTransactionStore.getTransactions( 13L ) ).thenReturn( txCursor( cursor( when( logicalTransactionStore.getTransactions( 14L ) ).thenReturn( txCursor( cursor(
tx( 13 ),
tx( 14 ), tx( 14 ),
tx( 15 ) tx( 15 )
) ) ); ) ) );
Expand All @@ -77,8 +76,7 @@ public void shouldRespondWithStreamOfTransactions() throws Exception
txPullRequestHandler.channelRead0( context, new TxPullRequest( 13, storeId ) ); txPullRequestHandler.channelRead0( context, new TxPullRequest( 13, storeId ) );


// then // then
verify( context, times( 3 ) ).write( ResponseMessageType.TX ); verify( context, times( 2 ) ).write( ResponseMessageType.TX );
verify( context ).write( new TxPullResponse( storeId, tx( 13 ) ) );
verify( context ).write( new TxPullResponse( storeId, tx( 14 ) ) ); verify( context ).write( new TxPullResponse( storeId, tx( 14 ) ) );
verify( context ).write( new TxPullResponse( storeId, tx( 15 ) ) ); verify( context ).write( new TxPullResponse( storeId, tx( 15 ) ) );


Expand All @@ -96,7 +94,7 @@ public void shouldRespondWithoutTransactionsIfTheyDoNotExist() throws Exception
when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L );


LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class ); LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class );
when( logicalTransactionStore.getTransactions( 13L ) ).thenThrow( new NoSuchTransactionException( 13 ) ); when( logicalTransactionStore.getTransactions( 14L ) ).thenThrow( new NoSuchTransactionException( 14 ) );


AssertableLogProvider logProvider = new AssertableLogProvider(); AssertableLogProvider logProvider = new AssertableLogProvider();
TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(),
Expand All @@ -112,7 +110,7 @@ public void shouldRespondWithoutTransactionsIfTheyDoNotExist() throws Exception
verify( context ).write( ResponseMessageType.TX_STREAM_FINISHED ); verify( context ).write( ResponseMessageType.TX_STREAM_FINISHED );
verify( context ).write( new TxStreamFinishedResponse( E_TRANSACTION_PRUNED ) ); verify( context ).write( new TxStreamFinishedResponse( E_TRANSACTION_PRUNED ) );
logProvider.assertAtLeastOnce( inLog( TxPullRequestHandler.class ) logProvider.assertAtLeastOnce( inLog( TxPullRequestHandler.class )
.info( "Failed to serve TxPullRequest for tx %d because the transaction does not exist.", 13L ) ); .info( "Failed to serve TxPullRequest for tx %d because the transaction does not exist.", 14L ) );
} }


@Test @Test
Expand Down

0 comments on commit bcc9157

Please sign in to comment.