diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStream.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStream.java index 03985dd986f06..cb534d99932b3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStream.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStream.java @@ -29,6 +29,7 @@ import org.neo4j.cursor.IOCursor; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; +import static java.lang.String.format; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM; /** @@ -42,13 +43,15 @@ public class ChunkedTransactionStream implements ChunkedInput private boolean endOfInput; private boolean noMoreTransactions; + private long expectedTxId; private long lastTxId; private Object pending; - ChunkedTransactionStream( StoreId storeId, IOCursor txCursor, CatchupServerProtocol protocol ) + ChunkedTransactionStream( StoreId storeId, long firstTxId, IOCursor txCursor, CatchupServerProtocol protocol ) { this.storeId = storeId; + this.expectedTxId = firstTxId; this.txCursor = txCursor; this.protocol = protocol; } @@ -96,6 +99,12 @@ else if ( txCursor.next() ) CommittedTransactionRepresentation tx = txCursor.get(); lastTxId = tx.getCommitEntry().getTxId(); + if ( lastTxId != expectedTxId ) + { + String msg = format( "Transaction cursor out of order. Expected %d but was %d", expectedTxId, lastTxId ); + throw new IllegalStateException( msg ); + } + expectedTxId++; pending = new TxPullResponse( storeId, tx ); return ResponseMessageType.TX; } @@ -129,4 +138,9 @@ public long progress() { return 0; } + + public long lastTxId() + { + return lastTxId; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java index 5ff3c4d8fb9b4..ee57e223715fb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java @@ -40,6 +40,7 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.String.format; import static org.neo4j.causalclustering.catchup.CatchupResult.E_INVALID_REQUEST; import static org.neo4j.causalclustering.catchup.CatchupResult.E_STORE_ID_MISMATCH; import static org.neo4j.causalclustering.catchup.CatchupResult.E_STORE_UNAVAILABLE; @@ -85,12 +86,25 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg StoreId localStoreId = storeIdSupplier.get(); StoreId expectedStoreId = msg.expectedStoreId(); - IOCursor txCursor = getCursor( ctx, msg.previousTxId() + 1, localStoreId, expectedStoreId ); + long firstTxId = msg.previousTxId() + 1; + IOCursor txCursor = getCursor( ctx, firstTxId, localStoreId, expectedStoreId ); if ( txCursor != null ) { - ctx.writeAndFlush( new ChunkedTransactionStream( localStoreId, txCursor, protocol ) ); + ChunkedTransactionStream txStream = new ChunkedTransactionStream( localStoreId, firstTxId, txCursor, protocol ); // chunked transaction stream ends the interaction internally and closes the cursor + ctx.writeAndFlush( txStream ).addListener( f -> + { + String message = format( "Streamed transactions [%d--%d] to %s", firstTxId, txStream.lastTxId(), ctx.channel().remoteAddress() ); + if ( f.isSuccess() ) + { + log.info( message ); + } + else + { + log.warn( message, f.cause() ); + } + } ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStreamTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStreamTest.java index ea2d72590c513..bddbaa621ec90 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStreamTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStreamTest.java @@ -47,7 +47,7 @@ public void shouldStreamTransactions() throws Exception StoreId storeId = StoreId.DEFAULT; @SuppressWarnings( "unchecked" ) IOCursor cursor = mock( IOCursor.class ); - ChunkedTransactionStream txStream = new ChunkedTransactionStream( storeId, cursor, mock( CatchupServerProtocol.class ) ); + ChunkedTransactionStream txStream = new ChunkedTransactionStream( storeId, BASE_TX_ID + 1, cursor, mock( CatchupServerProtocol.class ) ); ByteBufAllocator allocator = mock( ByteBufAllocator.class ); CommittedTransactionRepresentation tx1 = tx( BASE_TX_ID + 1 ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java index ae337abb1b319..a65ef7065e4f0 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java @@ -19,11 +19,10 @@ */ package org.neo4j.causalclustering.catchup.tx; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import org.junit.Test; -import java.io.IOException; - import org.neo4j.causalclustering.catchup.CatchupServerProtocol; import org.neo4j.causalclustering.catchup.ResponseMessageType; import org.neo4j.causalclustering.identity.StoreId; @@ -40,6 +39,7 @@ import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.AssertableLogProvider; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -71,6 +71,7 @@ public void shouldRespondWithCompleteStreamOfTransactions() throws Exception // given when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); when( logicalTransactionStore.getTransactions( 14L ) ).thenReturn( txCursor( cursor( tx( 14 ), tx( 15 ) ) ) ); + when( context.writeAndFlush( any() ) ).thenReturn( mock( ChannelFuture.class ) ); // when txPullRequestHandler.channelRead0( context, new TxPullRequest( 13, storeId ) );