Skip to content

Commit

Permalink
Make sure we stream transactions as far as is required or fail
Browse files Browse the repository at this point in the history
This is a rare race which can occur when transaction log pruning
comes in while an iteration over transactions spanning several
files is in progress. If the pruner deletes files ahead of us
then the kernel transaction cursor will not notice, which
previously would lead us to fall short. A safety check is now
added which works regardless of cursor behaviour.
  • Loading branch information
martinfurmanski committed Sep 27, 2018
1 parent c57e6a8 commit b263fbc
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 34 deletions.
Expand Up @@ -26,23 +26,28 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput; import io.netty.handler.stream.ChunkedInput;


import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol; import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType; import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.cursor.IOCursor; import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.logging.Log;


import static java.lang.String.format; import static java.lang.String.format;
import static org.neo4j.causalclustering.catchup.CatchupResult.E_TRANSACTION_PRUNED;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;


/** /**
* Returns a chunked stream of transactions. * Returns a chunked stream of transactions.
*/ */
public class ChunkedTransactionStream implements ChunkedInput<Object> public class ChunkedTransactionStream implements ChunkedInput<Object>
{ {
private final Log log;
private final StoreId storeId; private final StoreId storeId;
private final IOCursor<CommittedTransactionRepresentation> txCursor; private final IOCursor<CommittedTransactionRepresentation> txCursor;
private final CatchupServerProtocol protocol; private final CatchupServerProtocol protocol;
private final long txIdPromise;


private boolean endOfInput; private boolean endOfInput;
private boolean noMoreTransactions; private boolean noMoreTransactions;
Expand All @@ -51,10 +56,13 @@ public class ChunkedTransactionStream implements ChunkedInput<Object>


private Object pending; private Object pending;


ChunkedTransactionStream( StoreId storeId, long firstTxId, IOCursor<CommittedTransactionRepresentation> txCursor, CatchupServerProtocol protocol ) ChunkedTransactionStream( Log log, StoreId storeId, long firstTxId, long txIdPromise, IOCursor<CommittedTransactionRepresentation> txCursor,
CatchupServerProtocol protocol )
{ {
this.log = log;
this.storeId = storeId; this.storeId = storeId;
this.expectedTxId = firstTxId; this.expectedTxId = firstTxId;
this.txIdPromise = txIdPromise;
this.txCursor = txCursor; this.txCursor = txCursor;
this.protocol = protocol; this.protocol = protocol;
} }
Expand Down Expand Up @@ -117,8 +125,17 @@ else if ( txCursor.next() )


noMoreTransactions = true; noMoreTransactions = true;
protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE );

CatchupResult result;
pending = new TxStreamFinishedResponse( SUCCESS_END_OF_STREAM, lastTxId ); if ( lastTxId >= txIdPromise )
{
result = SUCCESS_END_OF_STREAM;
}
else
{
result = E_TRANSACTION_PRUNED;
log.warn( "Transaction cursor fell short. Expected at least %d but only got to %d.", txIdPromise, lastTxId );
}
pending = new TxStreamFinishedResponse( result, lastTxId );
return ResponseMessageType.TX_STREAM_FINISHED; return ResponseMessageType.TX_STREAM_FINISHED;
} }
} }
Expand Down
Expand Up @@ -91,11 +91,20 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg
StoreId expectedStoreId = msg.expectedStoreId(); StoreId expectedStoreId = msg.expectedStoreId();


long firstTxId = msg.previousTxId() + 1; long firstTxId = msg.previousTxId() + 1;
IOCursor<CommittedTransactionRepresentation> txCursor = getCursor( ctx, firstTxId, localStoreId, expectedStoreId );
/*
* This is the minimum transaction id we must send to consider our streaming operation successful. The kernel can
* concurrently prune even future transactions while iterating and the cursor will silently fail on iteration, so
* we need to add our own protection for this reason and also as a generally important sanity check for the fulfillment
* of the consistent recovery contract which requires us to stream transactions at least as far as the time when the
* file copy operation completed.
*/
long txIdPromise = transactionIdStore.getLastCommittedTransactionId();
IOCursor<CommittedTransactionRepresentation> txCursor = getCursor( txIdPromise, ctx, firstTxId, localStoreId, expectedStoreId );


if ( txCursor != null ) if ( txCursor != null )
{ {
ChunkedTransactionStream txStream = new ChunkedTransactionStream( localStoreId, firstTxId, txCursor, protocol ); ChunkedTransactionStream txStream = new ChunkedTransactionStream( log, localStoreId, firstTxId, txIdPromise, txCursor, protocol );
// chunked transaction stream ends the interaction internally and closes the cursor // chunked transaction stream ends the interaction internally and closes the cursor
ctx.writeAndFlush( txStream ).addListener( f -> ctx.writeAndFlush( txStream ).addListener( f ->
{ {
Expand All @@ -115,27 +124,25 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg
} }
} }


private IOCursor<CommittedTransactionRepresentation> getCursor( ChannelHandlerContext ctx, long firstTxId, private IOCursor<CommittedTransactionRepresentation> getCursor( long txIdPromise, ChannelHandlerContext ctx, long firstTxId,
StoreId localStoreId, StoreId expectedStoreId ) throws IOException StoreId localStoreId, StoreId expectedStoreId ) throws IOException
{ {
long lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId();

if ( localStoreId == null || !localStoreId.equals( expectedStoreId ) ) if ( localStoreId == null || !localStoreId.equals( expectedStoreId ) )
{ {
log.info( "Failed to serve TxPullRequest for tx %d and storeId %s because that storeId is different " + log.info( "Failed to serve TxPullRequest for tx %d and storeId %s because that storeId is different " +
"from this machine with %s", firstTxId, expectedStoreId, localStoreId ); "from this machine with %s", firstTxId, expectedStoreId, localStoreId );
endInteraction( ctx, E_STORE_ID_MISMATCH, lastCommittedTransactionId ); endInteraction( ctx, E_STORE_ID_MISMATCH, txIdPromise );
return null; return null;
} }
else if ( !databaseAvailable.getAsBoolean() ) else if ( !databaseAvailable.getAsBoolean() )
{ {
log.info( "Failed to serve TxPullRequest for tx %d because the local database is unavailable.", firstTxId ); log.info( "Failed to serve TxPullRequest for tx %d because the local database is unavailable.", firstTxId );
endInteraction( ctx, E_STORE_UNAVAILABLE, lastCommittedTransactionId ); endInteraction( ctx, E_STORE_UNAVAILABLE, txIdPromise );
return null; return null;
} }
else if ( lastCommittedTransactionId < firstTxId ) else if ( txIdPromise < firstTxId )
{ {
endInteraction( ctx, SUCCESS_END_OF_STREAM, lastCommittedTransactionId ); endInteraction( ctx, SUCCESS_END_OF_STREAM, txIdPromise );
return null; return null;
} }


Expand All @@ -146,7 +153,7 @@ else if ( lastCommittedTransactionId < firstTxId )
catch ( NoSuchTransactionException e ) catch ( NoSuchTransactionException e )
{ {
log.info( "Failed to serve TxPullRequest for tx %d because the transaction does not exist.", firstTxId ); log.info( "Failed to serve TxPullRequest for tx %d because the transaction does not exist.", firstTxId );
endInteraction( ctx, E_TRANSACTION_PRUNED, lastCommittedTransactionId ); endInteraction( ctx, E_TRANSACTION_PRUNED, txIdPromise );
return null; return null;
} }
} }
Expand Down
Expand Up @@ -25,64 +25,124 @@
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import org.junit.Test; import org.junit.Test;


import java.util.ArrayList;
import java.util.List;

import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol; import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType; import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.cursor.IOCursor; import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.logging.NullLog;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.returnsElementsOf;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.catchup.CatchupResult.E_TRANSACTION_PRUNED;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;


@SuppressWarnings( {"unchecked", "UnnecessaryLocalVariable"} )
public class ChunkedTransactionStreamTest public class ChunkedTransactionStreamTest
{ {
private final StoreId storeId = StoreId.DEFAULT;
private final ByteBufAllocator allocator = mock( ByteBufAllocator.class );
private final CatchupServerProtocol protocol = mock( CatchupServerProtocol.class );
private final IOCursor<CommittedTransactionRepresentation> cursor = mock( IOCursor.class );
private final int baseTxId = (int) BASE_TX_ID;

@Test @Test
public void shouldStreamTransactions() throws Exception public void shouldSucceedExactNumberOfTransactions() throws Exception
{ {
// given int firstTxId = baseTxId;
StoreId storeId = StoreId.DEFAULT; int lastTxId = 10;
@SuppressWarnings( "unchecked" ) int txIdPromise = 10;
IOCursor<CommittedTransactionRepresentation> cursor = mock( IOCursor.class ); testTransactionStream( firstTxId, lastTxId, txIdPromise, SUCCESS_END_OF_STREAM );
ChunkedTransactionStream txStream = new ChunkedTransactionStream( storeId, BASE_TX_ID + 1, cursor, mock( CatchupServerProtocol.class ) ); }
ByteBufAllocator allocator = mock( ByteBufAllocator.class );


CommittedTransactionRepresentation tx1 = tx( BASE_TX_ID + 1 ); @Test
CommittedTransactionRepresentation tx2 = tx( BASE_TX_ID + 2 ); public void shouldSucceedWithNoTransactions() throws Exception
CommittedTransactionRepresentation tx3 = tx( BASE_TX_ID + 3 ); {
long lastTxId = BASE_TX_ID + 3; int firstTxId = baseTxId;
int lastTxId = baseTxId;
int txIdPromise = baseTxId;
testTransactionStream( firstTxId, lastTxId, txIdPromise, SUCCESS_END_OF_STREAM );
}


when( cursor.next() ).thenReturn( true, true, true, false ); @Test
when( cursor.get() ).thenReturn( tx1, tx2, tx3 ); public void shouldSucceedExcessiveNumberOfTransactions() throws Exception
{
int firstTxId = baseTxId;
int lastTxId = 10;
int txIdPromise = 9;
testTransactionStream( firstTxId, lastTxId, txIdPromise, SUCCESS_END_OF_STREAM );
}

@Test
public void shouldFailIncompleteStreamOfTransactions() throws Exception
{
int firstTxId = baseTxId;
int lastTxId = 10;
int txIdPromise = 11;
testTransactionStream( firstTxId, lastTxId, txIdPromise, E_TRANSACTION_PRUNED );
}

@Test
public void shouldSucceedLargeNumberOfTransactions() throws Exception
{
int firstTxId = baseTxId;
int lastTxId = 1000;
int txIdPromise = 900;
testTransactionStream( firstTxId, lastTxId, txIdPromise, SUCCESS_END_OF_STREAM );
}

@SuppressWarnings( "SameParameterValue" )
private void testTransactionStream( int firstTxId, int lastTxId, int txIdPromise, CatchupResult expectedResult ) throws Exception
{
ChunkedTransactionStream txStream = new ChunkedTransactionStream( NullLog.getInstance(), storeId, firstTxId, txIdPromise, cursor, protocol );

List<Boolean> more = new ArrayList<>();
List<CommittedTransactionRepresentation> txs = new ArrayList<>();

for ( int txId = firstTxId; txId <= lastTxId; txId++ )
{
more.add( true );
txs.add( tx( txId ) );
}
txs.add( null );
more.add( false );

when( cursor.next() ).thenAnswer( returnsElementsOf( more ) );
when( cursor.get() ).thenAnswer( returnsElementsOf( txs ) );


// when/then // when/then
assertFalse( txStream.isEndOfInput() ); assertFalse( txStream.isEndOfInput() );


assertEquals( ResponseMessageType.TX, txStream.readChunk( allocator ) ); for ( int txId = firstTxId; txId <= lastTxId; txId++ )
assertEquals( new TxPullResponse( storeId, tx1 ), txStream.readChunk( allocator ) ); {
assertEquals( ResponseMessageType.TX, txStream.readChunk( allocator ) ); assertEquals( ResponseMessageType.TX, txStream.readChunk( allocator ) );
assertEquals( new TxPullResponse( storeId, tx2 ), txStream.readChunk( allocator ) ); assertEquals( new TxPullResponse( storeId, txs.get( txId - firstTxId ) ), txStream.readChunk( allocator ) );
assertEquals( ResponseMessageType.TX, txStream.readChunk( allocator ) ); }
assertEquals( new TxPullResponse( storeId, tx3 ), txStream.readChunk( allocator ) );


assertEquals( ResponseMessageType.TX_STREAM_FINISHED, txStream.readChunk( allocator ) ); assertEquals( ResponseMessageType.TX_STREAM_FINISHED, txStream.readChunk( allocator ) );
assertEquals( new TxStreamFinishedResponse( SUCCESS_END_OF_STREAM, lastTxId ), txStream.readChunk( allocator ) ); assertEquals( new TxStreamFinishedResponse( expectedResult, lastTxId ), txStream.readChunk( allocator ) );


assertTrue( txStream.isEndOfInput() ); assertTrue( txStream.isEndOfInput() );


// when // when
txStream.close(); txStream.close();

// then // then
verify( cursor ).close(); verify( cursor ).close();
} }


private CommittedTransactionRepresentation tx( long txId ) private CommittedTransactionRepresentation tx( int txId )
{ {
CommittedTransactionRepresentation tx = mock( CommittedTransactionRepresentation.class ); CommittedTransactionRepresentation tx = mock( CommittedTransactionRepresentation.class );
when( tx.getCommitEntry() ).thenReturn( new LogEntryCommit( txId, 0 ) ); when( tx.getCommitEntry() ).thenReturn( new LogEntryCommit( txId, 0 ) );
Expand Down

0 comments on commit b263fbc

Please sign in to comment.