Skip to content

Commit

Permalink
Fixing flaky
Browse files Browse the repository at this point in the history
CoreToCoreCopySnapshotIT.shouldBeAbleToDownloadToNewInstanceAfterPruning

If the last entry in a snapshot was not a transaction then the
CoreState would try to apply an entry from before the start of
the log.

Now we tell CoreState#skip that is has already applied up to the index
of the snapshot, rather than relying on the index from the transaction
log.
  • Loading branch information
Mark Needham committed Jun 6, 2016
1 parent 1869870 commit 34f3de2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
Expand Up @@ -93,9 +93,13 @@ public CoreState( RaftLog raftLog, int flushEvery, Supplier<DatabaseHealth> dbHe
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() ); this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() );
} }


synchronized void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied ) synchronized void setStateMachine( CoreStateMachines coreStateMachines )
{ {
this.coreStateMachines = coreStateMachines; this.coreStateMachines = coreStateMachines;
}

public void skip( long lastApplied )
{
this.lastApplied = this.lastFlushed = lastApplied; this.lastApplied = this.lastFlushed = lastApplied;
} }


Expand Down
Expand Up @@ -141,6 +141,7 @@ void installSnapshots( CoreSnapshot coreSnapshot )
{ {
throw new RuntimeException( e ); throw new RuntimeException( e );
} }
coreState.skip( snapshotPrevIndex );
} }


public void refresh( TransactionRepresentationCommitProcess localCommit ) public void refresh( TransactionRepresentationCommitProcess localCommit )
Expand All @@ -152,6 +153,6 @@ public void refresh( TransactionRepresentationCommitProcess localCommit )
relationshipTypeTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); relationshipTypeTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex );
propertyKeyTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); propertyKeyTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex );


coreState.setStateMachine( this, lastAppliedIndex ); coreState.setStateMachine( this );
} }
} }
Expand Up @@ -115,7 +115,7 @@ public void shouldApplyCommittedCommand() throws Exception
// given // given
RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class );
monitors.addMonitorListener( listener ); monitors.addMonitorListener( listener );
coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


// when // when
Expand All @@ -136,7 +136,7 @@ public void shouldApplyCommittedCommand() throws Exception
public void shouldNotApplyUncommittedCommands() throws Exception public void shouldNotApplyUncommittedCommands() throws Exception
{ {
// given // given
coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


// when // when
Expand All @@ -153,7 +153,7 @@ public void shouldNotApplyUncommittedCommands() throws Exception
public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Exception public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Exception
{ {
// given // given
coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


// when // when
Expand All @@ -172,7 +172,7 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex
public void shouldPeriodicallyFlushState() throws Exception public void shouldPeriodicallyFlushState() throws Exception
{ {
// given // given
coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


int TIMES = 5; int TIMES = 5;
Expand All @@ -194,7 +194,7 @@ public void shouldPeriodicallyFlushState() throws Exception
public void shouldPanicIfUnableToApply() throws Exception public void shouldPanicIfUnableToApply() throws Exception
{ {
// given // given
coreState.setStateMachine( failingTxStateMachine, -1 ); coreState.setStateMachine( failingTxStateMachine );
coreState.start(); coreState.start();


raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) );
Expand All @@ -214,7 +214,7 @@ public void shouldApplyToLogFromCache() throws Throwable
//given n things to apply in the cache, check that they are actually applied. //given n things to apply in the cache, check that they are actually applied.


// given // given
coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) );
Expand All @@ -234,7 +234,7 @@ public void cacheEntryShouldBePurgedWhenApplied() throws Throwable
//given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things"


// given // given
coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) );
Expand All @@ -257,7 +257,7 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable
// if the cache does not contain all things to be applied, make sure we fall back to the log // if the cache does not contain all things to be applied, make sure we fall back to the log
// should only happen in recovery, otherwise this is probably a bug. // should only happen in recovery, otherwise this is probably a bug.


coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


//given cache with missing entry //given cache with missing entry
Expand Down Expand Up @@ -293,7 +293,7 @@ public void shouldFailWhenCacheAndLogMiss() throws Exception
{ {
//When an entry is not in the log, we must fail. //When an entry is not in the log, we must fail.


coreState.setStateMachine( txStateMachine, -1 ); coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) );
Expand Down
Expand Up @@ -144,7 +144,7 @@ public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception


cluster = Cluster.start( dbDir, 3, 0, params, new TestOnlyDiscoveryServiceFactory() ); cluster = Cluster.start( dbDir, 3, 0, params, new TestOnlyDiscoveryServiceFactory() );


CoreGraphDatabase source = cluster.coreTx( ( db, tx ) -> { CoreGraphDatabase leader = cluster.coreTx( ( db, tx ) -> {
createData( db, 10000 ); createData( db, 10000 );
tx.success(); tx.success();
} ); } );
Expand All @@ -155,12 +155,15 @@ public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception
coreDb.compact(); coreDb.compact();
} }


cluster.removeCoreServer( leader ); // to force a change of leader
leader = cluster.awaitLeader();

int newDbId = 3; int newDbId = 3;
cluster.addCoreServerWithServerId( newDbId, 4 ); cluster.addCoreServerWithServerId( newDbId, 3 );
CoreGraphDatabase newDb = cluster.getCoreServerById( 3 ); CoreGraphDatabase newDb = cluster.getCoreServerById( newDbId );


// then // then
assertEquals( DbRepresentation.of( source ), DbRepresentation.of( newDb ) ); assertEquals( DbRepresentation.of( leader ), DbRepresentation.of( newDb ) );
} }


@Test @Test
Expand Down Expand Up @@ -267,7 +270,7 @@ private void doSomeTransactions( Cluster cluster, int count )


private String string( int numberOfCharacters ) private String string( int numberOfCharacters )
{ {
StringBuffer s = new StringBuffer(); StringBuilder s = new StringBuilder();
for ( int i = 0; i < numberOfCharacters; i++ ) for ( int i = 0; i < numberOfCharacters; i++ )
{ {
s.append( String.valueOf( i ) ); s.append( String.valueOf( i ) );
Expand Down

0 comments on commit 34f3de2

Please sign in to comment.