diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java index 3b82623214742..c4133fc16cf3f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java @@ -73,7 +73,7 @@ public class ConsensusModule private final RaftMachine raftMachine; private final DelayedRenewableTimeoutService raftTimeoutService; private final RaftMembershipManager raftMembershipManager; - private final InFlightMap inFlightMap = new InFlightMap<>(); + private final InFlightMap inFlightMap = new InFlightMap<>(); public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound outbound, File clusterStateDirectory, CoreTopologyService discoveryService ) @@ -193,7 +193,7 @@ public RaftMembershipManager raftMembershipManager() return raftMembershipManager; } - public InFlightMap inFlightMap() + public InFlightMap inFlightMap() { return inFlightMap; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java index 88ed0be669dad..e6bb355c55e4a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMachine.java @@ -97,7 +97,7 @@ public RaftMachine( MemberId myself, StateStorage termStorage, Outbound outbound, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, - InFlightMap inFlightMap, + InFlightMap inFlightMap, Monitors monitors ) { this.myself = myself; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java index e65a9825173ca..eaea6705205c9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightMap.java @@ -19,14 +19,14 @@ */ package org.neo4j.coreedge.core.consensus.log.segmented; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; import static java.lang.String.format; -public class InFlightMap +public class InFlightMap { - private final Map map = new ConcurrentHashMap<>(); + private final SortedMap map = new ConcurrentSkipListMap<>(); /** * Adds a new mapping. @@ -35,7 +35,7 @@ public class InFlightMap * @param value The value corresponding to the key provided. * @throws IllegalArgumentException if a mapping for the key already exists */ - public void register( K key, V value ) + public void put( Long key, V value ) { V previousValue = map.putIfAbsent( key, value ); @@ -52,19 +52,35 @@ public void register( K key, V value ) * @param key The key to use for retrieving the value from the map * @return the value for this key, otherwise null. */ - public V retrieve( K key ) + public V get( Long key ) { return map.get( key ); } /** - * Attempts to unregister this object from the map. + * Attempts to remove this object from the map. * * @param key The object to attempt unregistering. * @return true if the attempt to unregister was successful, otherwise false if this object was not found. */ - public boolean unregister( K key ) + public boolean remove( Long key ) { return map.remove( key ) != null; } + + /** + * Attempts to remove all objects at this key or higher from the map. + * + * @param key The object to attempt unregistering. + */ + public void truncate( Long key ) + { + map.tailMap( key ).keySet().forEach( map::remove ); + } + + @Override + public String toString() + { + return String.format( "InFlightMap{map=%s}", map ); + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java index 393e33b8ba9c9..457e6da639b92 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/AppendLogEntry.java @@ -49,9 +49,9 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException } @Override - public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException + public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException { - inFlightMap.register( index, entry ); + inFlightMap.put( index, entry ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java index dffad21435ce8..d1301c0ba780d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntries.java @@ -62,11 +62,11 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException } @Override - public void applyTo( InFlightMap inFlightMap, Log log ) + public void applyTo( InFlightMap inFlightMap, Log log ) { for ( int i = offset; i < entries.length; i++ ) { - inFlightMap.register( baseIndex + i , entries[i]); + inFlightMap.put( baseIndex + i , entries[i]); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java index 770a7c2b9b17a..73add13be96b9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/RaftLogCommand.java @@ -38,5 +38,5 @@ interface Handler void applyTo( RaftLog raftLog, Log log ) throws IOException; - void applyTo( InFlightMap inFlightMap, Log log ) throws IOException; + void applyTo( InFlightMap inFlightMap, Log log ) throws IOException; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java index cb3e3747dfa04..3731bb8792d7c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommand.java @@ -49,15 +49,10 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException } @Override - public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException + public void applyTo( InFlightMap inFlightMap, Log log ) throws IOException { - long truncateIndex = fromIndex; - log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", truncateIndex, inFlightMap ); - while ( inFlightMap.unregister( truncateIndex ) ) - { - truncateIndex++; - } - log.debug( "End truncating in-flight-map at index %d", truncateIndex - 1 ); + log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightMap ); + inFlightMap.truncate( fromIndex ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java index 4bd9a81558c26..d599b51b6e3f8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.java @@ -111,7 +111,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private final long retryTimeMillis; private final int catchupBatchSize; private final int maxAllowedShippingLag; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private DelayedRenewableTimeoutService timeoutService; private RenewableTimeout timeout; @@ -124,7 +124,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName RaftLogShipper( Outbound outbound, LogProvider logProvider, ReadableRaftLog raftLog, Clock clock, MemberId leader, MemberId follower, long leaderTerm, long leaderCommit, long retryTimeMillis, - int catchupBatchSize, int maxAllowedShippingLag, InFlightMap inFlightMap ) + int catchupBatchSize, int maxAllowedShippingLag, InFlightMap inFlightMap ) { this.outbound = outbound; this.catchupBatchSize = catchupBatchSize; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java index 7e063df74b43e..c1a9910ac373d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/shipping/RaftLogShippingManager.java @@ -51,7 +51,7 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb private final long retryTimeMillis; private final int catchupBatchSize; private final int maxAllowedShippingLag; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private Map logShippers = new HashMap<>(); private LeaderContext lastLeaderContext; @@ -63,7 +63,7 @@ public RaftLogShippingManager( Outbound outbo ReadableRaftLog raftLog, Clock clock, MemberId myself, RaftMembership membership, long retryTimeMillis, int catchupBatchSize, int maxAllowedShippingLag, - InFlightMap inFlightMap ) + InFlightMap inFlightMap ) { this.outbound = outbound; this.logProvider = logProvider; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java index a9ee5786d45be..8da4d6e9c3ae0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/state/RaftState.java @@ -46,7 +46,7 @@ public class RaftState implements ReadableRaftState private final RaftMembership membership; private final Log log; private final RaftLog entryLog; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private TermState termState; private VoteState voteState; @@ -63,7 +63,7 @@ public RaftState( MemberId myself, RaftMembership membership, RaftLog entryLog, StateStorage voteStorage, - InFlightMap inFlightMap, LogProvider logProvider ) + InFlightMap inFlightMap, LogProvider logProvider ) { this.myself = myself; this.termStorage = termStorage; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java index 3e19f6232eaa9..e5c5f04e9d2f6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CommandApplicationProcess.java @@ -56,7 +56,7 @@ public class CommandApplicationProcess extends LifecycleAdapter private final ProgressTracker progressTracker; private final SessionTracker sessionTracker; private final Supplier dbHealth; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private final Log log; private final CoreStateApplier applier; private final RaftLogCommitIndexMonitor commitIndexMonitor; @@ -81,7 +81,7 @@ public CommandApplicationProcess( StateStorage lastFlushedStorage, SessionTracker sessionTracker, CoreStateApplier applier, - InFlightMap inFlightMap, + InFlightMap inFlightMap, Monitors monitors ) { this.coreStateMachines = coreStateMachines; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java index 2c147101ad468..477bf5b298b5b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/InFlightLogEntryReader.java @@ -31,13 +31,13 @@ public class InFlightLogEntryReader implements AutoCloseable { private final ReadableRaftLog raftLog; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private final boolean removeReadIndexFromMap; private RaftLogCursor cursor; private boolean useInFlightMap = true; - public InFlightLogEntryReader( ReadableRaftLog raftLog, InFlightMap inFlightMap, + public InFlightLogEntryReader( ReadableRaftLog raftLog, InFlightMap inFlightMap, boolean removeReadIndexFromMap ) { this.raftLog = raftLog; @@ -51,7 +51,7 @@ public RaftLogEntry get( long logIndex ) throws IOException if ( useInFlightMap ) { - entry = inFlightMap.retrieve( logIndex ); + entry = inFlightMap.get( logIndex ); } if ( entry == null ) @@ -62,7 +62,7 @@ public RaftLogEntry get( long logIndex ) throws IOException if ( removeReadIndexFromMap ) { - inFlightMap.unregister( logIndex ); + inFlightMap.remove( logIndex ); } return entry; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java index 6c4ce847a4c2a..467b2ff8698b1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/RaftMachineBuilder.java @@ -77,7 +77,7 @@ public class RaftMachineBuilder new InMemoryStateStorage<>( new RaftMembershipState() ); private Monitors monitors = new Monitors(); private CommitListener commitListener = commitIndex -> {}; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java index 1f958104b5ab4..56a7ecc97b5ee 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/explorer/ComparableRaftState.java @@ -53,11 +53,11 @@ public class ComparableRaftState implements ReadableRaftState private long lastLogIndexBeforeWeBecameLeader = -1; private FollowerStates followerStates = new FollowerStates<>(); protected final RaftLog entryLog; - private final InFlightMap inFlightMap; + private final InFlightMap inFlightMap; private long commitIndex = -1; ComparableRaftState( MemberId myself, Set votingMembers, Set replicationMembers, - RaftLog entryLog, InFlightMap inFlightMap, LogProvider logProvider ) + RaftLog entryLog, InFlightMap inFlightMap, LogProvider logProvider ) { this.myself = myself; this.votingMembers = votingMembers; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java index a15995fbf3461..9f75f33a00366 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/InFlightLogEntriesCacheTest.java @@ -21,8 +21,8 @@ import org.junit.Test; -import java.util.LinkedList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,61 +34,58 @@ public class InFlightLogEntriesCacheTest @Test public void shouldRegisterAndUnregisterValues() throws Exception { - InFlightMap entries = new InFlightMap<>(); + InFlightMap entries = new InFlightMap<>(); - List logEntryList = new LinkedList<>(); - logEntryList.add( new Object() ); + Map logEntryList = new HashMap<>(); + logEntryList.put(1L, new Object() ); - for ( Object registeredEntry : logEntryList ) + for ( Map.Entry entry : logEntryList.entrySet() ) { - entries.register( registeredEntry, registeredEntry ); + entries.put( entry.getKey(), entry.getValue() ); } - for ( Object expected : logEntryList ) + for ( Map.Entry entry : logEntryList.entrySet() ) { - Object retrieved = entries.retrieve( expected ); - assertEquals( expected, retrieved ); + Object retrieved = entries.get( entry.getKey() ); + assertEquals( entry.getValue(), retrieved ); } - Object unexpected = new Object(); - Object shouldBeNull = entries.retrieve( unexpected ); + Long unexpected = 2L; + Object shouldBeNull = entries.get( unexpected ); assertNull( shouldBeNull ); - for ( Object expected : logEntryList ) + for ( Map.Entry entry : logEntryList.entrySet() ) { - boolean wasThere = entries.unregister( expected ); + boolean wasThere = entries.remove( entry.getKey() ); assertEquals( true, wasThere ); } - - boolean shouldBeFalse = entries.unregister( unexpected ); - assertFalse( shouldBeFalse ); } @Test( expected = IllegalArgumentException.class ) public void shouldNotReinsertValues() throws Exception { - InFlightMap entries = new InFlightMap<>(); + InFlightMap entries = new InFlightMap<>(); Object addedObject = new Object(); - entries.register( addedObject, addedObject ); - entries.register( addedObject, addedObject ); + entries.put( 1L, addedObject ); + entries.put( 1L, addedObject ); } @Test public void shouldNotReplaceRegisteredValues() throws Exception { - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>(); Object first = new Object(); Object second = new Object(); try { - cache.register( first, first ); - cache.register( first, second ); + cache.put( 1L, first ); + cache.put( 1L, second ); fail("Should not allow silent replacement of values"); } catch ( IllegalArgumentException e ) { - assertEquals( first, cache.retrieve( first ) ); + assertEquals( first, cache.get( 1L ) ); } } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java index e865bcb11b53a..8a59c602b008a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/BatchAppendLogEntriesTest.java @@ -90,14 +90,14 @@ public void applyTo() throws Exception BatchAppendLogEntries batchAppend = new BatchAppendLogEntries( baseIndex, offset, entries ); - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>(); //when batchAppend.applyTo( cache, log ); //then - assertNull( cache.retrieve( 0L ) ); - assertNotNull( cache.retrieve( 1L ) ); - assertNotNull( cache.retrieve( 2L ) ); + assertNull( cache.get( 0L ) ); + assertNotNull( cache.get( 1L ) ); + assertNotNull( cache.get( 2L ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java index 7756a82edcd7a..04b14e1e5f1fd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/outcome/TruncateLogCommandTest.java @@ -44,25 +44,46 @@ public void applyTo() throws Exception Log log = logProvider.getLog( getClass() ); long fromIndex = 2L; TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex ); - InFlightMap inFlightMap = new InFlightMap<>(); + InFlightMap inFlightMap = new InFlightMap<>(); - inFlightMap.register( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); - inFlightMap.register( 1L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); - inFlightMap.register( 2L, new RaftLogEntry( 2L, valueOf( 2 ) ) ); - inFlightMap.register( 3L, new RaftLogEntry( 3L, valueOf( 3 ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); + inFlightMap.put( 1L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 2L, valueOf( 2 ) ) ); + inFlightMap.put( 3L, new RaftLogEntry( 3L, valueOf( 3 ) ) ); //when truncateLogCommand.applyTo( inFlightMap, log ); //then - assertNotNull( inFlightMap.retrieve( 0L ) ); - assertNotNull( inFlightMap.retrieve( 1L ) ); - assertNull( inFlightMap.retrieve( 2L ) ); - assertNull( inFlightMap.retrieve( 3L ) ); + assertNotNull( inFlightMap.get( 0L ) ); + assertNotNull( inFlightMap.get( 1L ) ); + assertNull( inFlightMap.get( 2L ) ); + assertNull( inFlightMap.get( 3L ) ); logProvider.assertAtLeastOnce( inLog( getClass() ) .debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightMap ) ); - logProvider.assertAtLeastOnce( inLog( getClass() ) - .debug( "End truncating in-flight-map at index %d", fromIndex + 1 ) ); + } + + @Test + public void shouldTruncateWithGaps() throws Exception + { + //given + long fromIndex = 1L; + TruncateLogCommand truncateLogCommand = new TruncateLogCommand( fromIndex ); + + InFlightMap inFlightMap = new InFlightMap<>(); + + inFlightMap.put( 0L, new RaftLogEntry( 0L, valueOf( 0 ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 1L, valueOf( 1 ) ) ); + inFlightMap.put( 4L, new RaftLogEntry( 2L, valueOf( 2 ) ) ); + + truncateLogCommand.applyTo( inFlightMap, NullLog.getInstance() ); + + inFlightMap.put( 1L, new RaftLogEntry( 3L, valueOf( 1 ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 4L, valueOf( 2 ) ) ); + + assertNotNull( inFlightMap.get( 0L ) ); + assertNotNull( inFlightMap.get( 1L ) ); + assertNotNull( inFlightMap.get( 2L ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java index f698a89fbbc24..fdd8619fcc265 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/state/RaftStateTest.java @@ -62,7 +62,7 @@ public void shouldUpdateCacheState() throws Exception //Test that updates applied to the raft state will be refelcted in the entry cache. //given - InFlightMap cache = new InFlightMap<>(); + InFlightMap cache = new InFlightMap<>(); RaftState raftState = new RaftState( member( 0 ), new InMemoryStateStorage<>( new TermState() ), new FakeMembership(), new InMemoryRaftLog(), new InMemoryStateStorage<>( new VoteState() ), cache, NullLogProvider.getInstance() ); @@ -85,11 +85,11 @@ public void shouldUpdateCacheState() throws Exception raftState.update(raftTestMemberOutcome); //then - assertNotNull( cache.retrieve( 1L ) ); - assertNotNull( cache.retrieve( 2L ) ); - assertNotNull( cache.retrieve( 3L ) ); - assertEquals( valueOf( 5 ), cache.retrieve( 3L ).content() ); - assertNull( cache.retrieve( 4L ) ); + assertNotNull( cache.get( 1L ) ); + assertNotNull( cache.get( 2L ) ); + assertNotNull( cache.get( 3L ) ); + assertEquals( valueOf( 5 ), cache.get( 3L ).content() ); + assertNull( cache.get( 4L ) ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java index 990ef7eb1e580..ba45afcad019b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/CommandApplicationProcessTest.java @@ -80,7 +80,7 @@ public class CommandApplicationProcessTest private final int batchSize = 16; private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() ); - private InFlightMap inFlightMap = spy( new InFlightMap<>() ); + private InFlightMap inFlightMap = spy( new InFlightMap<>() ); private final Monitors monitors = new Monitors(); private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); private final CommandApplicationProcess applicationProcess = new CommandApplicationProcess( @@ -282,14 +282,14 @@ public void shouldApplyToLogFromCache() throws Throwable // given applicationProcess.start(); - inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); //when applicationProcess.notifyCommitted( 0 ); applier.sync( false ); //then the cache should have had it's get method called. - verify( inFlightMap, times( 1 ) ).retrieve( 0L ); + verify( inFlightMap, times( 1 ) ).get( 0L ); verifyZeroInteractions( raftLog ); } @@ -299,18 +299,18 @@ public void cacheEntryShouldBePurgedWhenApplied() throws Throwable //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" applicationProcess.start(); - inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); - inFlightMap.register( 1L, new RaftLogEntry( 0, operation( nullTx ) ) ); - inFlightMap.register( 2L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 1L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 2L, new RaftLogEntry( 0, operation( nullTx ) ) ); //when applicationProcess.notifyCommitted( 0 ); applier.sync( false ); //then the cache should have had its get method called. - assertNull( inFlightMap.retrieve( 0L ) ); - assertNotNull( inFlightMap.retrieve( 1L ) ); - assertNotNull( inFlightMap.retrieve( 2L ) ); + assertNull( inFlightMap.get( 0L ) ); + assertNotNull( inFlightMap.get( 1L ) ); + assertNotNull( inFlightMap.get( 2L ) ); } @Test @@ -325,8 +325,8 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable ReplicatedContent operation1 = operation( nullTx ); ReplicatedContent operation2 = operation( nullTx ); - inFlightMap.register( 0L, new RaftLogEntry( 0, operation0 ) ); - inFlightMap.register( 2L, new RaftLogEntry( 2, operation2 ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0, operation0 ) ); + inFlightMap.put( 2L, new RaftLogEntry( 2, operation2 ) ); raftLog.append( new RaftLogEntry( 0, operation0 ) ); raftLog.append( new RaftLogEntry( 1, operation1 ) ); @@ -337,9 +337,13 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable applier.sync( false ); - //then the cache should have had its get method called. - verify( inFlightMap, times( 0 ) ).retrieve( 2L ); - verify( inFlightMap, times( 3 ) ).unregister( anyLong() ); //everything is cleaned up + //then the cache stops being used after it finds 1 is missing + verify( inFlightMap, times( 1 ) ).get( 0L ); + verify( inFlightMap, times( 1 ) ).get( 1L ); + verify( inFlightMap, times( 0 ) ).get( 2L ); + + // we only look up 1 so let's remove that from the cache + verify( inFlightMap, times( 1 ) ).remove( 0L ); verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); @@ -354,7 +358,7 @@ public void shouldFailWhenCacheAndLogMiss() throws Throwable //When an entry is not in the log, we must fail. applicationProcess.start(); - inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.put( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 1, operation( nullTx ) ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java index 788c9718f47f3..b083343b95d90 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/InFlightLogEntryReaderTest.java @@ -47,7 +47,7 @@ public class InFlightLogEntryReaderTest { private final ReadableRaftLog raftLog = mock( ReadableRaftLog.class ); @SuppressWarnings( "unchecked" ) - private final InFlightMap inFlightMap = mock( InFlightMap.class ); + private final InFlightMap inFlightMap = mock( InFlightMap.class ); private final long logIndex = 42L; private final RaftLogEntry entry = mock( RaftLogEntry.class ); @@ -73,7 +73,7 @@ public void shouldUseTheCacheWhenTheIndexIsPresent() throws Exception // then assertEquals( entry, raftLogEntry ); - verify( inFlightMap ).retrieve( logIndex ); + verify( inFlightMap ).get( logIndex ); assertCacheIsUpdated( inFlightMap, logIndex ); verifyNoMoreInteractions( inFlightMap ); verifyZeroInteractions( raftLog ); @@ -92,9 +92,10 @@ public void shouldUseTheRaftLogWhenTheIndexIsNotPresent() throws Exception // then assertEquals( entry, raftLogEntry ); - verify( inFlightMap ).retrieve( logIndex ); + verify( inFlightMap ).get( logIndex ); verify( raftLog ).getEntryCursor( logIndex ); assertCacheIsUpdated( inFlightMap, logIndex ); + verifyNoMoreInteractions( inFlightMap ); verifyNoMoreInteractions( raftLog ); } @@ -118,7 +119,7 @@ public void shouldNeverUseMapAgainAfterHavingFeltBackToTheRaftLog() throws Excep if ( offset <= 1) { - verify( inFlightMap ).retrieve( offset + logIndex ); + verify( inFlightMap ).get( offset + logIndex ); } if ( offset == 1 ) @@ -133,13 +134,13 @@ public void shouldNeverUseMapAgainAfterHavingFeltBackToTheRaftLog() throws Excep verifyNoMoreInteractions( raftLog ); } - private void startingFromIndexReturnEntries( InFlightMap inFlightMap, long startIndex, + private void startingFromIndexReturnEntries( InFlightMap inFlightMap, long startIndex, RaftLogEntry entry, RaftLogEntry... otherEntries ) throws IOException { - when( inFlightMap.retrieve( startIndex ) ).thenReturn( entry ); + when( inFlightMap.get( startIndex ) ).thenReturn( entry ); for ( int offset = 0; offset < otherEntries.length; offset++ ) { - when( inFlightMap.retrieve( startIndex + offset + 1L ) ).thenReturn( otherEntries[offset] ); + when( inFlightMap.get( startIndex + offset + 1L ) ).thenReturn( otherEntries[offset] ); } } @@ -168,15 +169,15 @@ private void startingFromIndexReturnEntries( ReadableRaftLog raftLog, long start when( cursor.get() ).thenReturn( entry, raftLogEntries ); } - public void assertCacheIsUpdated( InFlightMap inFlightMap, long key ) + public void assertCacheIsUpdated( InFlightMap inFlightMap, long key ) { if ( clearCache ) { - verify( inFlightMap, times( 1 ) ).unregister( key ); + verify( inFlightMap, times( 1 ) ).remove( key ); } else { - verify( inFlightMap, never() ).unregister( key ); + verify( inFlightMap, never() ).remove( key ); } } }