Skip to content

Commit

Permalink
Assertions and logs for server side transaction streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Feb 27, 2018
1 parent 0079c91 commit 4fb5ddd
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;

import static java.lang.String.format;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;

/**
Expand All @@ -42,13 +43,15 @@ public class ChunkedTransactionStream implements ChunkedInput<Object>

private boolean endOfInput;
private boolean noMoreTransactions;
private long expectedTxId;
private long lastTxId;

private Object pending;

ChunkedTransactionStream( StoreId storeId, IOCursor<CommittedTransactionRepresentation> txCursor, CatchupServerProtocol protocol )
ChunkedTransactionStream( StoreId storeId, long firstTxId, IOCursor<CommittedTransactionRepresentation> txCursor, CatchupServerProtocol protocol )
{
this.storeId = storeId;
this.expectedTxId = firstTxId;
this.txCursor = txCursor;
this.protocol = protocol;
}
Expand Down Expand Up @@ -96,6 +99,12 @@ else if ( txCursor.next() )

CommittedTransactionRepresentation tx = txCursor.get();
lastTxId = tx.getCommitEntry().getTxId();
if ( lastTxId != expectedTxId )
{
String msg = format( "Transaction cursor out of order. Expected %d but was %d", expectedTxId, lastTxId );
throw new IllegalStateException( msg );
}
expectedTxId++;
pending = new TxPullResponse( storeId, tx );
return ResponseMessageType.TX;
}
Expand Down Expand Up @@ -129,4 +138,9 @@ public long progress()
{
return 0;
}

public long lastTxId()
{
return lastTxId;
}
}
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static org.neo4j.causalclustering.catchup.CatchupResult.E_INVALID_REQUEST;
import static org.neo4j.causalclustering.catchup.CatchupResult.E_STORE_ID_MISMATCH;
import static org.neo4j.causalclustering.catchup.CatchupResult.E_STORE_UNAVAILABLE;
Expand Down Expand Up @@ -85,12 +86,25 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg
StoreId localStoreId = storeIdSupplier.get();
StoreId expectedStoreId = msg.expectedStoreId();

IOCursor<CommittedTransactionRepresentation> txCursor = getCursor( ctx, msg.previousTxId() + 1, localStoreId, expectedStoreId );
long firstTxId = msg.previousTxId() + 1;
IOCursor<CommittedTransactionRepresentation> txCursor = getCursor( ctx, firstTxId, localStoreId, expectedStoreId );

if ( txCursor != null )
{
ctx.writeAndFlush( new ChunkedTransactionStream( localStoreId, txCursor, protocol ) );
ChunkedTransactionStream txStream = new ChunkedTransactionStream( localStoreId, firstTxId, txCursor, protocol );
// chunked transaction stream ends the interaction internally and closes the cursor
ctx.writeAndFlush( txStream ).addListener( f ->
{
String message = format( "Streamed transactions [%d--%d] to %s", firstTxId, txStream.lastTxId(), ctx.channel().remoteAddress() );
if ( f.isSuccess() )
{
log.info( message );
}
else
{
log.warn( message, f.cause() );
}
} );
}
}

Expand Down
Expand Up @@ -47,7 +47,7 @@ public void shouldStreamTransactions() throws Exception
StoreId storeId = StoreId.DEFAULT;
@SuppressWarnings( "unchecked" )
IOCursor<CommittedTransactionRepresentation> cursor = mock( IOCursor.class );
ChunkedTransactionStream txStream = new ChunkedTransactionStream( storeId, cursor, mock( CatchupServerProtocol.class ) );
ChunkedTransactionStream txStream = new ChunkedTransactionStream( storeId, BASE_TX_ID + 1, cursor, mock( CatchupServerProtocol.class ) );
ByteBufAllocator allocator = mock( ByteBufAllocator.class );

CommittedTransactionRepresentation tx1 = tx( BASE_TX_ID + 1 );
Expand Down
Expand Up @@ -19,11 +19,10 @@
*/
package org.neo4j.causalclustering.catchup.tx;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import org.junit.Test;

import java.io.IOException;

import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId;
Expand All @@ -40,6 +39,7 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.AssertableLogProvider;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -71,6 +71,7 @@ public void shouldRespondWithCompleteStreamOfTransactions() throws Exception
// given
when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L );
when( logicalTransactionStore.getTransactions( 14L ) ).thenReturn( txCursor( cursor( tx( 14 ), tx( 15 ) ) ) );
when( context.writeAndFlush( any() ) ).thenReturn( mock( ChannelFuture.class ) );

// when
txPullRequestHandler.channelRead0( context, new TxPullRequest( 13, storeId ) );
Expand Down

0 comments on commit 4fb5ddd

Please sign in to comment.