From 7783bb3edb17d285fef78898ecef22651e8fecbe Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Mon, 17 Oct 2016 14:37:32 +0100 Subject: [PATCH] Change the API of FlightMap so that we have separate methods for truncating multiple entries and removing an individual entry This will solve the problem where if we have gaps in the map then some entries will never be removed. --- .../core/consensus/ConsensusModule.java | 4 +- .../coreedge/core/consensus/RaftMachine.java | 2 +- .../consensus/log/segmented/InFlightMap.java | 32 +++++++++---- .../consensus/outcome/AppendLogEntry.java | 4 +- .../outcome/BatchAppendLogEntries.java | 4 +- .../consensus/outcome/RaftLogCommand.java | 2 +- .../consensus/outcome/TruncateLogCommand.java | 11 ++--- .../consensus/shipping/RaftLogShipper.java | 4 +- .../shipping/RaftLogShippingManager.java | 4 +- .../core/consensus/state/RaftState.java | 4 +- .../core/state/CommandApplicationProcess.java | 4 +- .../core/state/InFlightLogEntryReader.java | 8 ++-- .../core/consensus/RaftMachineBuilder.java | 2 +- .../explorer/ComparableRaftState.java | 4 +- .../InFlightLogEntriesCacheTest.java | 45 +++++++++---------- .../outcome/BatchAppendLogEntriesTest.java | 8 ++-- .../outcome/TruncateLogCommandTest.java | 43 +++++++++++++----- .../core/consensus/state/RaftStateTest.java | 12 ++--- .../state/CommandApplicationProcessTest.java | 34 +++++++------- .../state/InFlightLogEntryReaderTest.java | 21 ++++----- 20 files changed, 143 insertions(+), 109 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java index 3b82623214742..c4133fc16cf3f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java @@ -73,7 +73,7 @@ public class ConsensusModule private final RaftMachine raftMachine; private final DelayedRenewableTimeoutService raftTimeoutService; private final RaftMembershipManager raftMembershipManager; - private final InFlightMap inFlightMap = new InFlightMap<>(); + private final InFlightMap inFlightMap = new InFlightMap<>(); public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound outbound, File clusterStateDirectory, CoreTopologyService discoveryService ) @@ -193,7 +193,7 @@ public RaftMembershipManager raftMembershipManager() return raftMembershipManager; } - public InFlightMap inFlightMap() + public InFlightMap inFlightMap() { return inFlightMap; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java index 88ed0be669dad..e6bb355c55e4a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java @@ -97,7 +97,7 @@ public RaftMachine( MemberId myself, StateStorage termStorage, Outbound outbound, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, - InFlightMap inFlightMap, + InFlightMap inFlightMap, Monitors monitors ) { this.myself = myself; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java index e65a9825173ca..eaea6705205c9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java @@ -19,14 +19,14 @@ */ package org.neo4j.coreedge.core.consensus.log.segmented; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; import static java.lang.String.format; -public class InFlightMap +public class InFlightMap { - private final Map map = new ConcurrentHashMap<>(); + private final SortedMap map = new ConcurrentSkipListMap<>(); /** * Adds a new mapping. @@ -35,7 +35,7 @@ public class InFlightMap * @param value The value corresponding to the key provided. * @throws IllegalArgumentException if a mapping for the key already exists */ - public void register( K key, V value ) + public void put( Long key, V value ) { V previousValue = map.putIfAbsent( key, value ); @@ -52,19 +52,35 @@ public void register( K key, V value ) * @param key The key to use for retrieving the value from the map * @return the value for this key, otherwise null. */ - public V retrieve( K key ) + public V get( Long key ) { return map.get( key ); } /** - * Attempts to unregister this object from the map. + * Attempts to remove this object from the map. * * @param key The object to attempt unregistering. * @return true if the attempt to unregister was successful, otherwise false if this object was not found. */ - public boolean unregister( K key ) + public boolean remove( Long key ) { return map.remove( key ) != null; } + + /** + * Attempts to remove all objects at this key or higher from the map. + * + * @param key The object to attempt unregistering. + */ + public void truncate( Long key ) + { + map.tailMap( key ).keySet().forEach( map::remove ); + } + + @Override + public String toString() + { + return String.format( "InFlightMap{map=%s}", map ); + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java index 393e33b8ba9c9..457e6da639b92 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java @@ -49,9 +49,9 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException } @Override - public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException + public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException { - inFlightMap.register( index, entry ); + inFlightMap.put( index, entry ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java index dffad21435ce8..d1301c0ba780d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java @@ -62,11 +62,11 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException } @Override - public void applyTo( InFlightMap inFlightMap, Log log ) + public void applyTo( InFlightMap inFlightMap, Log log ) { for ( int i = offset; i < entries.length; i++ ) { - inFlightMap.register( baseIndex + i , entries[i]); + inFlightMap.put( baseIndex + i , entries[i]); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java index 770a7c2b9b17a..73add13be96b9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java @@ -38,5 +38,5 @@ interface Handler void applyTo( RaftLog raftLog, Log log ) throws IOException; - void applyTo( InFlightMap inFlightMap, Log log ) throws IOException; + void applyTo( InFlightMap inFlightMap, Log log ) throws IOException; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java index cb3e3747dfa04..3731bb8792d7c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java @@ -49,15 +49,10 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException } @Override - public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException + public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException { - long truncateIndex = fromIndex; - log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", truncateIndex, inFlightMap ); - while ( inFlightMap.unregister( truncateIndex ) ) - { - truncateIndex++; - } - log.debug( "End truncating in-flight-map at index %d", truncateIndex - 1 ); + log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightMap ); + inFlightMap.truncate( fromIndex ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java index 4bd9a81558c26..d599b51b6e3f8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java @@ -111,7 +111,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private final long retryTimeMillis; private final int catchupBatchSize; private final int maxAllowedShippingLag; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private DelayedRenewableTimeoutService timeoutService; private RenewableTimeout timeout; @@ -124,7 +124,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName RaftLogShipper( Outbound outbound, LogProvider logProvider, ReadableRaftLog raftLog, Clock clock, MemberId leader, MemberId follower, long leaderTerm, long leaderCommit, long retryTimeMillis, - int catchupBatchSize, int maxAllowedShippingLag, InFlightMap inFlightMap ) + int catchupBatchSize, int maxAllowedShippingLag, InFlightMap inFlightMap ) { this.outbound = outbound; this.catchupBatchSize = catchupBatchSize; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java index 7e063df74b43e..c1a9910ac373d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java @@ -51,7 +51,7 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb private final long retryTimeMillis; private final int catchupBatchSize; private final int maxAllowedShippingLag; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private Map logShippers = new HashMap<>(); private LeaderContext lastLeaderContext; @@ -63,7 +63,7 @@ public RaftLogShippingManager( Outbound outbo ReadableRaftLog raftLog, Clock clock, MemberId myself, RaftMembership membership, long retryTimeMillis, int catchupBatchSize, int maxAllowedShippingLag, - InFlightMap inFlightMap ) + InFlightMap inFlightMap ) { this.outbound = outbound; this.logProvider = logProvider; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java index a9ee5786d45be..8da4d6e9c3ae0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java @@ -46,7 +46,7 @@ public class RaftState implements ReadableRaftState private final RaftMembership membership; private final Log log; private final RaftLog entryLog; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private TermState termState; private VoteState voteState; @@ -63,7 +63,7 @@ public RaftState( MemberId myself, RaftMembership membership, RaftLog entryLog, StateStorage voteStorage, - InFlightMap inFlightMap, LogProvider logProvider ) + InFlightMap inFlightMap, LogProvider logProvider ) { this.myself = myself; this.termStorage = termStorage; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java index 3e19f6232eaa9..e5c5f04e9d2f6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java @@ -56,7 +56,7 @@ public class CommandApplicationProcess extends LifecycleAdapter private final ProgressTracker progressTracker; private final SessionTracker sessionTracker; private final Supplier dbHealth; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private final Log log; private final CoreStateApplier applier; private final RaftLogCommitIndexMonitor commitIndexMonitor; @@ -81,7 +81,7 @@ public CommandApplicationProcess( StateStorage lastFlushedStorage, SessionTracker sessionTracker, CoreStateApplier applier, - InFlightMap inFlightMap, + InFlightMap inFlightMap, Monitors monitors ) { this.coreStateMachines = coreStateMachines; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java index 2c147101ad468..477bf5b298b5b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java @@ -31,13 +31,13 @@ public class InFlightLogEntryReader implements AutoCloseable { private final ReadableRaftLog raftLog; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private final boolean removeReadIndexFromMap; private RaftLogCursor cursor; private boolean useInFlightMap = true; - public InFlightLogEntryReader( ReadableRaftLog raftLog, InFlightMap inFlightMap, + public InFlightLogEntryReader( ReadableRaftLog raftLog, InFlightMap inFlightMap, boolean removeReadIndexFromMap ) { this.raftLog = raftLog; @@ -51,7 +51,7 @@ public RaftLogEntry get( long logIndex ) throws IOException if ( useInFlightMap ) { - entry = inFlightMap.retrieve( logIndex ); + entry = inFlightMap.get( logIndex ); } if ( entry == null ) @@ -62,7 +62,7 @@ public RaftLogEntry get( long logIndex ) throws IOException if ( removeReadIndexFromMap ) { - inFlightMap.unregister( logIndex ); + inFlightMap.remove( logIndex ); } return entry; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java index 6c4ce847a4c2a..467b2ff8698b1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java @@ -77,7 +77,7 @@ public class RaftMachineBuilder new InMemoryStateStorage<>( new RaftMembershipState() ); private Monitors monitors = new Monitors(); private CommitListener commitListener = commitIndex -> {}; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java index 1f958104b5ab4..56a7ecc97b5ee 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java @@ -53,11 +53,11 @@ public class ComparableRaftState implements ReadableRaftState private long lastLogIndexBeforeWeBecameLeader = -1; private FollowerStates followerStates = new FollowerStates<>(); protected final RaftLog entryLog; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private long commitIndex = -1; ComparableRaftState( MemberId myself, Set votingMembers, Set replicationMembers, - RaftLog entryLog, InFlightMap inFlightMap, LogProvider logProvider ) + RaftLog entryLog, InFlightMap inFlightMap, LogProvider logProvider ) { this.myself = myself; this.votingMembers = votingMembers; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java index a15995fbf3461..9f75f33a00366 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java @@ -21,8 +21,8 @@ import org.junit.Test; -import java.util.LinkedList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,61 +34,58 @@ public class InFlightLogEntriesCacheTest @Test public void shouldRegisterAndUnregisterValues() throws Exception { - InFlightMap entries = new InFlightMap<>(); + InFlightMap entries = new InFlightMap<>(); - List logEntryList = new LinkedList<>(); - logEntryList.add( new Object() ); + Map logEntryList = new HashMap<>(); + logEntryList.put(1L, new Object() ); - for ( Object registeredEntry : logEntryList ) + for ( Map.Entry entry : logEntryList.entrySet() ) { - entries.register( registeredEntry, registeredEntry ); + entries.put( entry.getKey(), entry.getValue() ); } - for ( Object expected : logEntryList ) + for ( Map.Entry entry : logEntryList.entrySet() ) { - Object retrieved = entries.retrieve( expected ); - assertEquals( expected, retrieved ); + Object retrieved = entries.get( entry.getKey() ); + assertEquals( entry.getValue(), retrieved ); } - Object unexpected = new Object(); - Object shouldBeNull = entries.retrieve( unexpected ); + Long unexpected = 2L; + Object shouldBeNull = entries.get( unexpected ); assertNull( shouldBeNull ); - for ( Object expected : logEntryList ) + for ( Map.Entry entry : logEntryList.entrySet() ) { - boolean wasThere = entries.unregister( expected ); + boolean wasThere = entries.remove( entry.getKey() ); assertEquals( true, wasThere ); } - - boolean shouldBeFalse = entries.unregister( unexpected ); - assertFalse( shouldBeFalse ); } @Test( expected = IllegalArgumentException.class ) public void shouldNotReinsertValues() throws Exception { - InFlightMap entries = new InFlightMap<>(); + InFlightMap entries = new InFlightMap<>(); Object addedObject = new Object(); - entries.register( addedObject, addedObject ); - entries.register( addedObject, addedObject ); + entries.put( 1L, addedObject ); + entries.put( 1L, addedObject ); } @Test public void shouldNotReplaceRegisteredValues() throws Exception { - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>(); Object first = new Object(); Object second = new Object(); try { - cache.register( first, first ); - cache.register( first, second ); + cache.put( 1L, first ); + cache.put( 1L, second ); fail("Should not allow silent replacement of values"); } catch ( IllegalArgumentException e ) { - assertEquals( first, cache.retrieve( first ) ); + assertEquals( first, cache.get( 1L ) ); } } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java index e865bcb11b53a..8a59c602b008a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java @@ -90,14 +90,14 @@ public void applyTo() throws Exception BatchAppendLogEntries batchAppend = new BatchAppendLogEntries( baseIndex, offset, entries ); - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>(); //when batchAppend.applyTo( cache, log ); //then - assertNull( cache.retrieve( 0L ) ); - assertNotNull( cache.retrieve( 1L ) ); - assertNotNull( cache.retrieve( 2L ) ); + assertNull( cache.get( 0L ) ); + assertNotNull( cache.get( 1L ) ); + assertNotNull( cache.get( 2L ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java index 7756a82edcd7a..04b14e1e5f1fd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java @@ -44,25 +44,46 @@ public void applyTo() throws Exception Log log = logProvider.getLog( getClass() ); long fromIndex = 2L; TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex ); - InFlightMap inFlightMap = new InFlightMap<>(); + InFlightMap inFlightMap = new InFlightMap<>(); - inFlightMap.register( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); - inFlightMap.register( 1L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); - inFlightMap.register( 2L, new RaftLogEntry( 2L, valueOf( 2 ) ) ); - inFlightMap.register( 3L, new RaftLogEntry( 3L, valueOf( 3 ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); + inFlightMap.put( 1L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 2L, valueOf( 2 ) ) ); + inFlightMap.put( 3L, new RaftLogEntry( 3L, valueOf( 3 ) ) ); //when truncateLogCommand.applyTo( inFlightMap, log ); //then - assertNotNull( inFlightMap.retrieve( 0L ) ); - assertNotNull( inFlightMap.retrieve( 1L ) ); - assertNull( inFlightMap.retrieve( 2L ) ); - assertNull( inFlightMap.retrieve( 3L ) ); + assertNotNull( inFlightMap.get( 0L ) ); + assertNotNull( inFlightMap.get( 1L ) ); + assertNull( inFlightMap.get( 2L ) ); + assertNull( inFlightMap.get( 3L ) ); logProvider.assertAtLeastOnce( inLog( getClass() ) .debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightMap ) ); - logProvider.assertAtLeastOnce( inLog( getClass() ) - .debug( "End truncating in-flight-map at index %d", fromIndex + 1 ) ); + } + + @Test + public void shouldTruncateWithGaps() throws Exception + { + //given + long fromIndex = 1L; + TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex ); + + InFlightMap inFlightMap = new InFlightMap<>(); + + inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); + inFlightMap.put( 4L, new RaftLogEntry( 2L, valueOf( 2 ) ) ); + + truncateLogCommand.applyTo( inFlightMap, NullLog.getInstance() ); + + inFlightMap.put( 1L, new RaftLogEntry( 3L, valueOf( 1 ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 4L, valueOf( 2 ) ) ); + + assertNotNull( inFlightMap.get( 0L ) ); + assertNotNull( inFlightMap.get( 1L ) ); + assertNotNull( inFlightMap.get( 2L ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java index f698a89fbbc24..fdd8619fcc265 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java @@ -62,7 +62,7 @@ public void shouldUpdateCacheState() throws Exception //Test that updates applied to the raft state will be refelcted in the entry cache. //given - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>(); RaftState raftState = new RaftState( member( 0 ), new InMemoryStateStorage<>( new TermState() ), new FakeMembership(), new InMemoryRaftLog(), new InMemoryStateStorage<>( new VoteState() ), cache, NullLogProvider.getInstance() ); @@ -85,11 +85,11 @@ public void shouldUpdateCacheState() throws Exception raftState.update(raftTestMemberOutcome); //then - assertNotNull( cache.retrieve( 1L ) ); - assertNotNull( cache.retrieve( 2L ) ); - assertNotNull( cache.retrieve( 3L ) ); - assertEquals( valueOf( 5 ), cache.retrieve( 3L ).content() ); - assertNull( cache.retrieve( 4L ) ); + assertNotNull( cache.get( 1L ) ); + assertNotNull( cache.get( 2L ) ); + assertNotNull( cache.get( 3L ) ); + assertEquals( valueOf( 5 ), cache.get( 3L ).content() ); + assertNull( cache.get( 4L ) ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java index 990ef7eb1e580..ba45afcad019b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java @@ -80,7 +80,7 @@ public class CommandApplicationProcessTest private final int batchSize = 16; private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() ); - private InFlightMap inFlightMap = spy( new InFlightMap<>() ); + private InFlightMap inFlightMap = spy( new InFlightMap<>() ); private final Monitors monitors = new Monitors(); private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); private final CommandApplicationProcess applicationProcess = new CommandApplicationProcess( @@ -282,14 +282,14 @@ public void shouldApplyToLogFromCache() throws Throwable // given applicationProcess.start(); - inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); //when applicationProcess.notifyCommitted( 0 ); applier.sync( false ); //then the cache should have had it's get method called. - verify( inFlightMap, times( 1 ) ).retrieve( 0L ); + verify( inFlightMap, times( 1 ) ).get( 0L ); verifyZeroInteractions( raftLog ); } @@ -299,18 +299,18 @@ public void cacheEntryShouldBePurgedWhenApplied() throws Throwable //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" applicationProcess.start(); - inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); - inFlightMap.register( 1L, new RaftLogEntry( 0, operation( nullTx ) ) ); - inFlightMap.register( 2L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 1L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 0, operation( nullTx ) ) ); //when applicationProcess.notifyCommitted( 0 ); applier.sync( false ); //then the cache should have had its get method called. - assertNull( inFlightMap.retrieve( 0L ) ); - assertNotNull( inFlightMap.retrieve( 1L ) ); - assertNotNull( inFlightMap.retrieve( 2L ) ); + assertNull( inFlightMap.get( 0L ) ); + assertNotNull( inFlightMap.get( 1L ) ); + assertNotNull( inFlightMap.get( 2L ) ); } @Test @@ -325,8 +325,8 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable ReplicatedContent operation1 = operation( nullTx ); ReplicatedContent operation2 = operation( nullTx ); - inFlightMap.register( 0L, new RaftLogEntry( 0, operation0 ) ); - inFlightMap.register( 2L, new RaftLogEntry( 2, operation2 ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0, operation0 ) ); + inFlightMap.put( 2L, new RaftLogEntry( 2, operation2 ) ); raftLog.append( new RaftLogEntry( 0, operation0 ) ); raftLog.append( new RaftLogEntry( 1, operation1 ) ); @@ -337,9 +337,13 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable applier.sync( false ); - //then the cache should have had its get method called. - verify( inFlightMap, times( 0 ) ).retrieve( 2L ); - verify( inFlightMap, times( 3 ) ).unregister( anyLong() ); //everything is cleaned up + //then the cache stops being used after it finds 1 is missing + verify( inFlightMap, times( 1 ) ).get( 0L ); + verify( inFlightMap, times( 1 ) ).get( 1L ); + verify( inFlightMap, times( 0 ) ).get( 2L ); + + // we only look up 1 so let's remove that from the cache + verify( inFlightMap, times( 1 ) ).remove( 0L ); verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); @@ -354,7 +358,7 @@ public void shouldFailWhenCacheAndLogMiss() throws Throwable //When an entry is not in the log, we must fail. applicationProcess.start(); - inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 1, operation( nullTx ) ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java index 788c9718f47f3..b083343b95d90 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java @@ -47,7 +47,7 @@ public class InFlightLogEntryReaderTest { private final ReadableRaftLog raftLog = mock( ReadableRaftLog.class ); @SuppressWarnings( "unchecked" ) - private final InFlightMap inFlightMap = mock( InFlightMap.class ); + private final InFlightMap inFlightMap = mock( InFlightMap.class ); private final long logIndex = 42L; private final RaftLogEntry entry = mock( RaftLogEntry.class ); @@ -73,7 +73,7 @@ public void shouldUseTheCacheWhenTheIndexIsPresent() throws Exception // then assertEquals( entry, raftLogEntry ); - verify( inFlightMap ).retrieve( logIndex ); + verify( inFlightMap ).get( logIndex ); assertCacheIsUpdated( inFlightMap, logIndex ); verifyNoMoreInteractions( inFlightMap ); verifyZeroInteractions( raftLog ); @@ -92,9 +92,10 @@ public void shouldUseTheRaftLogWhenTheIndexIsNotPresent() throws Exception // then assertEquals( entry, raftLogEntry ); - verify( inFlightMap ).retrieve( logIndex ); + verify( inFlightMap ).get( logIndex ); verify( raftLog ).getEntryCursor( logIndex ); assertCacheIsUpdated( inFlightMap, logIndex ); + verifyNoMoreInteractions( inFlightMap ); verifyNoMoreInteractions( raftLog ); } @@ -118,7 +119,7 @@ public void shouldNeverUseMapAgainAfterHavingFeltBackToTheRaftLog() throws Excep if ( offset <= 1) { - verify( inFlightMap ).retrieve( offset + logIndex ); + verify( inFlightMap ).get( offset + logIndex ); } if ( offset == 1 ) @@ -133,13 +134,13 @@ public void shouldNeverUseMapAgainAfterHavingFeltBackToTheRaftLog() throws Excep verifyNoMoreInteractions( raftLog ); } - private void startingFromIndexReturnEntries( InFlightMap inFlightMap, long startIndex, + private void startingFromIndexReturnEntries( InFlightMap inFlightMap, long startIndex, RaftLogEntry entry, RaftLogEntry... otherEntries ) throws IOException { - when( inFlightMap.retrieve( startIndex ) ).thenReturn( entry ); + when( inFlightMap.get( startIndex ) ).thenReturn( entry ); for ( int offset = 0; offset < otherEntries.length; offset++ ) { - when( inFlightMap.retrieve( startIndex + offset + 1L ) ).thenReturn( otherEntries[offset] ); + when( inFlightMap.get( startIndex + offset + 1L ) ).thenReturn( otherEntries[offset] ); } } @@ -168,15 +169,15 @@ private void startingFromIndexReturnEntries( ReadableRaftLog raftLog, long start when( cursor.get() ).thenReturn( entry, raftLogEntries ); } - public void assertCacheIsUpdated( InFlightMap inFlightMap, long key ) + public void assertCacheIsUpdated( InFlightMap inFlightMap, long key ) { if ( clearCache ) { - verify( inFlightMap, times( 1 ) ).unregister( key ); + verify( inFlightMap, times( 1 ) ).remove( key ); } else { - verify( inFlightMap, never() ).unregister( key ); + verify( inFlightMap, never() ).remove( key ); } } }