From 34f3de2f49105e9526c987eae7feabf34372dd1d Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Mon, 6 Jun 2016 15:13:07 +0100 Subject: [PATCH] Fixing flaky CoreToCoreCopySnapshotIT.shouldBeAbleToDownloadToNewInstanceAfterPruning If the last entry in a snapshot was not a transaction then the CoreState would try to apply an entry from before the start of the log. Now we tell CoreState#skip that is has already applied up to the index of the snapshot, rather than relying on the index from the transaction log. --- .../neo4j/coreedge/raft/state/CoreState.java | 6 +++++- .../coreedge/raft/state/CoreStateMachines.java | 3 ++- .../coreedge/raft/state/CoreStateTest.java | 18 +++++++++--------- .../scenarios/CoreToCoreCopySnapshotIT.java | 13 ++++++++----- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index 63fb3cc2234dc..55c1c54377746 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -93,9 +93,13 @@ public CoreState( RaftLog raftLog, int flushEvery, Supplier dbHe this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() ); } - synchronized void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied ) + synchronized void setStateMachine( CoreStateMachines coreStateMachines ) { this.coreStateMachines = coreStateMachines; + } + + public void skip( long lastApplied ) + { this.lastApplied = this.lastFlushed = lastApplied; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java index bf43f12a6381c..d2fbcfe065ae2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java @@ -141,6 +141,7 @@ void installSnapshots( CoreSnapshot coreSnapshot ) { throw new RuntimeException( e ); } + coreState.skip( snapshotPrevIndex ); } public void refresh( TransactionRepresentationCommitProcess localCommit ) @@ -152,6 +153,6 @@ public void refresh( TransactionRepresentationCommitProcess localCommit ) relationshipTypeTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); propertyKeyTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); - coreState.setStateMachine( this, lastAppliedIndex ); + coreState.setStateMachine( this ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 379f3d993395e..f6cab0ca67623 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -115,7 +115,7 @@ public void shouldApplyCommittedCommand() throws Exception // given RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); monitors.addMonitorListener( listener ); - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); // when @@ -136,7 +136,7 @@ public void shouldApplyCommittedCommand() throws Exception public void shouldNotApplyUncommittedCommands() throws Exception { // given - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); // when @@ -153,7 +153,7 @@ public void shouldNotApplyUncommittedCommands() throws Exception public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Exception { // given - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); // when @@ -172,7 +172,7 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex public void shouldPeriodicallyFlushState() throws Exception { // given - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); int TIMES = 5; @@ -194,7 +194,7 @@ public void shouldPeriodicallyFlushState() throws Exception public void shouldPanicIfUnableToApply() throws Exception { // given - coreState.setStateMachine( failingTxStateMachine, -1 ); + coreState.setStateMachine( failingTxStateMachine ); coreState.start(); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -214,7 +214,7 @@ public void shouldApplyToLogFromCache() throws Throwable //given n things to apply in the cache, check that they are actually applied. // given - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); @@ -234,7 +234,7 @@ public void cacheEntryShouldBePurgedWhenApplied() throws Throwable //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" // given - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -257,7 +257,7 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable // if the cache does not contain all things to be applied, make sure we fall back to the log // should only happen in recovery, otherwise this is probably a bug. - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); //given cache with missing entry @@ -293,7 +293,7 @@ public void shouldFailWhenCacheAndLogMiss() throws Exception { //When an entry is not in the log, we must fail. - coreState.setStateMachine( txStateMachine, -1 ); + coreState.setStateMachine( txStateMachine ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index a3b72d69cc1ed..ef53330659fc0 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -144,7 +144,7 @@ public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception cluster = Cluster.start( dbDir, 3, 0, params, new TestOnlyDiscoveryServiceFactory() ); - CoreGraphDatabase source = cluster.coreTx( ( db, tx ) -> { + CoreGraphDatabase leader = cluster.coreTx( ( db, tx ) -> { createData( db, 10000 ); tx.success(); } ); @@ -155,12 +155,15 @@ public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception coreDb.compact(); } + cluster.removeCoreServer( leader ); // to force a change of leader + leader = cluster.awaitLeader(); + int newDbId = 3; - cluster.addCoreServerWithServerId( newDbId, 4 ); - CoreGraphDatabase newDb = cluster.getCoreServerById( 3 ); + cluster.addCoreServerWithServerId( newDbId, 3 ); + CoreGraphDatabase newDb = cluster.getCoreServerById( newDbId ); // then - assertEquals( DbRepresentation.of( source ), DbRepresentation.of( newDb ) ); + assertEquals( DbRepresentation.of( leader ), DbRepresentation.of( newDb ) ); } @Test @@ -267,7 +270,7 @@ private void doSomeTransactions( Cluster cluster, int count ) private String string( int numberOfCharacters ) { - StringBuffer s = new StringBuffer(); + StringBuilder s = new StringBuilder(); for ( int i = 0; i < numberOfCharacters; i++ ) { s.append( String.valueOf( i ) );