Skip to content

Commit

Permalink
Change the API of FlightMap so that we have separate methods for
Browse files Browse the repository at this point in the history
truncating multiple entries and removing an individual entry

This will solve the problem where if we have gaps in the map
then some entries will never be removed.
  • Loading branch information
Mark Needham committed Oct 18, 2016
1 parent a245607 commit 7783bb3
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 109 deletions.
Expand Up @@ -73,7 +73,7 @@ public class ConsensusModule
private final RaftMachine raftMachine; private final RaftMachine raftMachine;
private final DelayedRenewableTimeoutService raftTimeoutService; private final DelayedRenewableTimeoutService raftTimeoutService;
private final RaftMembershipManager raftMembershipManager; private final RaftMembershipManager raftMembershipManager;
private final InFlightMap<Long,RaftLogEntry> inFlightMap = new InFlightMap<>(); private final InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();


public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound<MemberId,RaftMessages.RaftMessage> outbound, public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
File clusterStateDirectory, CoreTopologyService discoveryService ) File clusterStateDirectory, CoreTopologyService discoveryService )
Expand Down Expand Up @@ -193,7 +193,7 @@ public RaftMembershipManager raftMembershipManager()
return raftMembershipManager; return raftMembershipManager;
} }


public InFlightMap<Long,RaftLogEntry> inFlightMap() public InFlightMap<RaftLogEntry> inFlightMap()
{ {
return inFlightMap; return inFlightMap;
} }
Expand Down
Expand Up @@ -97,7 +97,7 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage,
Outbound<MemberId, RaftMessages.RaftMessage> outbound, Outbound<MemberId, RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager, LogProvider logProvider, RaftMembershipManager membershipManager,
RaftLogShippingManager logShipping, RaftLogShippingManager logShipping,
InFlightMap<Long, RaftLogEntry> inFlightMap, InFlightMap<RaftLogEntry> inFlightMap,
Monitors monitors ) Monitors monitors )
{ {
this.myself = myself; this.myself = myself;
Expand Down
Expand Up @@ -19,14 +19,14 @@
*/ */
package org.neo4j.coreedge.core.consensus.log.segmented; package org.neo4j.coreedge.core.consensus.log.segmented;


import java.util.Map; import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap;


import static java.lang.String.format; import static java.lang.String.format;


public class InFlightMap<K, V> public class InFlightMap<V>
{ {
private final Map<K,V> map = new ConcurrentHashMap<>(); private final SortedMap<Long,V> map = new ConcurrentSkipListMap<>();


/** /**
* Adds a new mapping. * Adds a new mapping.
Expand All @@ -35,7 +35,7 @@ public class InFlightMap<K, V>
* @param value The value corresponding to the key provided. * @param value The value corresponding to the key provided.
* @throws IllegalArgumentException if a mapping for the key already exists * @throws IllegalArgumentException if a mapping for the key already exists
*/ */
public void register( K key, V value ) public void put( Long key, V value )
{ {
V previousValue = map.putIfAbsent( key, value ); V previousValue = map.putIfAbsent( key, value );


Expand All @@ -52,19 +52,35 @@ public void register( K key, V value )
* @param key The key to use for retrieving the value from the map * @param key The key to use for retrieving the value from the map
* @return the value for this key, otherwise null. * @return the value for this key, otherwise null.
*/ */
public V retrieve( K key ) public V get( Long key )
{ {
return map.get( key ); return map.get( key );
} }


/** /**
* Attempts to unregister this object from the map. * Attempts to remove this object from the map.
* *
* @param key The object to attempt unregistering. * @param key The object to attempt unregistering.
* @return true if the attempt to unregister was successful, otherwise false if this object was not found. * @return true if the attempt to unregister was successful, otherwise false if this object was not found.
*/ */
public boolean unregister( K key ) public boolean remove( Long key )
{ {
return map.remove( key ) != null; return map.remove( key ) != null;
} }

/**
* Attempts to remove all objects at this key or higher from the map.
*
* @param key The object to attempt unregistering.
*/
public void truncate( Long key )
{
map.tailMap( key ).keySet().forEach( map::remove );
}

@Override
public String toString()
{
return String.format( "InFlightMap{map=%s}", map );
}
} }
Expand Up @@ -49,9 +49,9 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException
} }


@Override @Override
public void applyTo( InFlightMap<Long, RaftLogEntry> inFlightMap, Log log ) throws IOException public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException
{ {
inFlightMap.register( index, entry ); inFlightMap.put( index, entry );
} }


@Override @Override
Expand Down
Expand Up @@ -62,11 +62,11 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException
} }


@Override @Override
public void applyTo( InFlightMap<Long, RaftLogEntry> inFlightMap, Log log ) public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log )
{ {
for ( int i = offset; i < entries.length; i++ ) for ( int i = offset; i < entries.length; i++ )
{ {
inFlightMap.register( baseIndex + i , entries[i]); inFlightMap.put( baseIndex + i , entries[i]);
} }
} }


Expand Down
Expand Up @@ -38,5 +38,5 @@ interface Handler


void applyTo( RaftLog raftLog, Log log ) throws IOException; void applyTo( RaftLog raftLog, Log log ) throws IOException;


void applyTo( InFlightMap<Long,RaftLogEntry> inFlightMap, Log log ) throws IOException; void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException;
} }
Expand Up @@ -49,15 +49,10 @@ public void applyTo( RaftLog raftLog, Log log ) throws IOException
} }


@Override @Override
public void applyTo( InFlightMap<Long,RaftLogEntry> inFlightMap, Log log ) throws IOException public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException
{ {
long truncateIndex = fromIndex; log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightMap );
log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", truncateIndex, inFlightMap ); inFlightMap.truncate( fromIndex );
while ( inFlightMap.unregister( truncateIndex ) )
{
truncateIndex++;
}
log.debug( "End truncating in-flight-map at index %d", truncateIndex - 1 );
} }


@Override @Override
Expand Down
Expand Up @@ -111,7 +111,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private final long retryTimeMillis; private final long retryTimeMillis;
private final int catchupBatchSize; private final int catchupBatchSize;
private final int maxAllowedShippingLag; private final int maxAllowedShippingLag;
private final InFlightMap<Long, RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;


private DelayedRenewableTimeoutService timeoutService; private DelayedRenewableTimeoutService timeoutService;
private RenewableTimeout timeout; private RenewableTimeout timeout;
Expand All @@ -124,7 +124,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
RaftLogShipper( Outbound<MemberId, RaftMessages.RaftMessage> outbound, LogProvider logProvider, RaftLogShipper( Outbound<MemberId, RaftMessages.RaftMessage> outbound, LogProvider logProvider,
ReadableRaftLog raftLog, Clock clock, ReadableRaftLog raftLog, Clock clock,
MemberId leader, MemberId follower, long leaderTerm, long leaderCommit, long retryTimeMillis, MemberId leader, MemberId follower, long leaderTerm, long leaderCommit, long retryTimeMillis,
int catchupBatchSize, int maxAllowedShippingLag, InFlightMap<Long, RaftLogEntry> inFlightMap ) int catchupBatchSize, int maxAllowedShippingLag, InFlightMap<RaftLogEntry> inFlightMap )
{ {
this.outbound = outbound; this.outbound = outbound;
this.catchupBatchSize = catchupBatchSize; this.catchupBatchSize = catchupBatchSize;
Expand Down
Expand Up @@ -51,7 +51,7 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb
private final long retryTimeMillis; private final long retryTimeMillis;
private final int catchupBatchSize; private final int catchupBatchSize;
private final int maxAllowedShippingLag; private final int maxAllowedShippingLag;
private final InFlightMap<Long,RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;


private Map<MemberId,RaftLogShipper> logShippers = new HashMap<>(); private Map<MemberId,RaftLogShipper> logShippers = new HashMap<>();
private LeaderContext lastLeaderContext; private LeaderContext lastLeaderContext;
Expand All @@ -63,7 +63,7 @@ public RaftLogShippingManager( Outbound<MemberId,RaftMessages.RaftMessage> outbo
ReadableRaftLog raftLog, ReadableRaftLog raftLog,
Clock clock, MemberId myself, RaftMembership membership, long retryTimeMillis, Clock clock, MemberId myself, RaftMembership membership, long retryTimeMillis,
int catchupBatchSize, int maxAllowedShippingLag, int catchupBatchSize, int maxAllowedShippingLag,
InFlightMap<Long, RaftLogEntry> inFlightMap ) InFlightMap<RaftLogEntry> inFlightMap )
{ {
this.outbound = outbound; this.outbound = outbound;
this.logProvider = logProvider; this.logProvider = logProvider;
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class RaftState implements ReadableRaftState
private final RaftMembership membership; private final RaftMembership membership;
private final Log log; private final Log log;
private final RaftLog entryLog; private final RaftLog entryLog;
private final InFlightMap<Long,RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;


private TermState termState; private TermState termState;
private VoteState voteState; private VoteState voteState;
Expand All @@ -63,7 +63,7 @@ public RaftState( MemberId myself,
RaftMembership membership, RaftMembership membership,
RaftLog entryLog, RaftLog entryLog,
StateStorage<VoteState> voteStorage, StateStorage<VoteState> voteStorage,
InFlightMap<Long, RaftLogEntry> inFlightMap, LogProvider logProvider ) InFlightMap<RaftLogEntry> inFlightMap, LogProvider logProvider )
{ {
this.myself = myself; this.myself = myself;
this.termStorage = termStorage; this.termStorage = termStorage;
Expand Down
Expand Up @@ -56,7 +56,7 @@ public class CommandApplicationProcess extends LifecycleAdapter
private final ProgressTracker progressTracker; private final ProgressTracker progressTracker;
private final SessionTracker sessionTracker; private final SessionTracker sessionTracker;
private final Supplier<DatabaseHealth> dbHealth; private final Supplier<DatabaseHealth> dbHealth;
private final InFlightMap<Long,RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;
private final Log log; private final Log log;
private final CoreStateApplier applier; private final CoreStateApplier applier;
private final RaftLogCommitIndexMonitor commitIndexMonitor; private final RaftLogCommitIndexMonitor commitIndexMonitor;
Expand All @@ -81,7 +81,7 @@ public CommandApplicationProcess(
StateStorage<Long> lastFlushedStorage, StateStorage<Long> lastFlushedStorage,
SessionTracker sessionTracker, SessionTracker sessionTracker,
CoreStateApplier applier, CoreStateApplier applier,
InFlightMap<Long, RaftLogEntry> inFlightMap, InFlightMap<RaftLogEntry> inFlightMap,
Monitors monitors ) Monitors monitors )
{ {
this.coreStateMachines = coreStateMachines; this.coreStateMachines = coreStateMachines;
Expand Down
Expand Up @@ -31,13 +31,13 @@
public class InFlightLogEntryReader implements AutoCloseable public class InFlightLogEntryReader implements AutoCloseable
{ {
private final ReadableRaftLog raftLog; private final ReadableRaftLog raftLog;
private final InFlightMap<Long, RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;
private final boolean removeReadIndexFromMap; private final boolean removeReadIndexFromMap;


private RaftLogCursor cursor; private RaftLogCursor cursor;
private boolean useInFlightMap = true; private boolean useInFlightMap = true;


public InFlightLogEntryReader( ReadableRaftLog raftLog, InFlightMap<Long,RaftLogEntry> inFlightMap, public InFlightLogEntryReader( ReadableRaftLog raftLog, InFlightMap<RaftLogEntry> inFlightMap,
boolean removeReadIndexFromMap ) boolean removeReadIndexFromMap )
{ {
this.raftLog = raftLog; this.raftLog = raftLog;
Expand All @@ -51,7 +51,7 @@ public RaftLogEntry get( long logIndex ) throws IOException


if ( useInFlightMap ) if ( useInFlightMap )
{ {
entry = inFlightMap.retrieve( logIndex ); entry = inFlightMap.get( logIndex );
} }


if ( entry == null ) if ( entry == null )
Expand All @@ -62,7 +62,7 @@ public RaftLogEntry get( long logIndex ) throws IOException


if ( removeReadIndexFromMap ) if ( removeReadIndexFromMap )
{ {
inFlightMap.unregister( logIndex ); inFlightMap.remove( logIndex );
} }


return entry; return entry;
Expand Down
Expand Up @@ -77,7 +77,7 @@ public class RaftMachineBuilder
new InMemoryStateStorage<>( new RaftMembershipState() ); new InMemoryStateStorage<>( new RaftMembershipState() );
private Monitors monitors = new Monitors(); private Monitors monitors = new Monitors();
private CommitListener commitListener = commitIndex -> {}; private CommitListener commitListener = commitIndex -> {};
private final InFlightMap<Long, RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;


public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder ) public RaftMachineBuilder( MemberId member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder )
{ {
Expand Down
Expand Up @@ -53,11 +53,11 @@ public class ComparableRaftState implements ReadableRaftState
private long lastLogIndexBeforeWeBecameLeader = -1; private long lastLogIndexBeforeWeBecameLeader = -1;
private FollowerStates followerStates = new FollowerStates<>(); private FollowerStates followerStates = new FollowerStates<>();
protected final RaftLog entryLog; protected final RaftLog entryLog;
private final InFlightMap<Long,RaftLogEntry> inFlightMap; private final InFlightMap<RaftLogEntry> inFlightMap;
private long commitIndex = -1; private long commitIndex = -1;


ComparableRaftState( MemberId myself, Set votingMembers, Set replicationMembers, ComparableRaftState( MemberId myself, Set votingMembers, Set replicationMembers,
RaftLog entryLog, InFlightMap<Long, RaftLogEntry> inFlightMap, LogProvider logProvider ) RaftLog entryLog, InFlightMap<RaftLogEntry> inFlightMap, LogProvider logProvider )
{ {
this.myself = myself; this.myself = myself;
this.votingMembers = votingMembers; this.votingMembers = votingMembers;
Expand Down
Expand Up @@ -21,8 +21,8 @@


import org.junit.Test; import org.junit.Test;


import java.util.LinkedList; import java.util.HashMap;
import java.util.List; import java.util.Map;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
Expand All @@ -34,61 +34,58 @@ public class InFlightLogEntriesCacheTest
@Test @Test
public void shouldRegisterAndUnregisterValues() throws Exception public void shouldRegisterAndUnregisterValues() throws Exception
{ {
InFlightMap<Object,Object> entries = new InFlightMap<>(); InFlightMap<Object> entries = new InFlightMap<>();


List<Object> logEntryList = new LinkedList<>(); Map<Long, Object> logEntryList = new HashMap<>();
logEntryList.add( new Object() ); logEntryList.put(1L, new Object() );


for ( Object registeredEntry : logEntryList ) for ( Map.Entry<Long, Object> entry : logEntryList.entrySet() )
{ {
entries.register( registeredEntry, registeredEntry ); entries.put( entry.getKey(), entry.getValue() );
} }


for ( Object expected : logEntryList ) for ( Map.Entry<Long, Object> entry : logEntryList.entrySet() )
{ {
Object retrieved = entries.retrieve( expected ); Object retrieved = entries.get( entry.getKey() );
assertEquals( expected, retrieved ); assertEquals( entry.getValue(), retrieved );
} }


Object unexpected = new Object(); Long unexpected = 2L;
Object shouldBeNull = entries.retrieve( unexpected ); Object shouldBeNull = entries.get( unexpected );
assertNull( shouldBeNull ); assertNull( shouldBeNull );


for ( Object expected : logEntryList ) for ( Map.Entry<Long, Object> entry : logEntryList.entrySet() )
{ {
boolean wasThere = entries.unregister( expected ); boolean wasThere = entries.remove( entry.getKey() );
assertEquals( true, wasThere ); assertEquals( true, wasThere );
} }

boolean shouldBeFalse = entries.unregister( unexpected );
assertFalse( shouldBeFalse );
} }


@Test( expected = IllegalArgumentException.class ) @Test( expected = IllegalArgumentException.class )
public void shouldNotReinsertValues() throws Exception public void shouldNotReinsertValues() throws Exception
{ {
InFlightMap<Object,Object> entries = new InFlightMap<>(); InFlightMap<Object> entries = new InFlightMap<>();
Object addedObject = new Object(); Object addedObject = new Object();
entries.register( addedObject, addedObject ); entries.put( 1L, addedObject );
entries.register( addedObject, addedObject ); entries.put( 1L, addedObject );
} }


@Test @Test
public void shouldNotReplaceRegisteredValues() throws Exception public void shouldNotReplaceRegisteredValues() throws Exception
{ {
InFlightMap<Object,Object> cache = new InFlightMap<>(); InFlightMap<Object> cache = new InFlightMap<>();
Object first = new Object(); Object first = new Object();
Object second = new Object(); Object second = new Object();


try try
{ {
cache.register( first, first ); cache.put( 1L, first );
cache.register( first, second ); cache.put( 1L, second );
fail("Should not allow silent replacement of values"); fail("Should not allow silent replacement of values");
} }
catch ( IllegalArgumentException e ) catch ( IllegalArgumentException e )
{ {
assertEquals( first, cache.retrieve( first ) ); assertEquals( first, cache.get( 1L ) );
} }
} }
} }
Expand Up @@ -90,14 +90,14 @@ public void applyTo() throws Exception


BatchAppendLogEntries batchAppend = new BatchAppendLogEntries( baseIndex, offset, entries ); BatchAppendLogEntries batchAppend = new BatchAppendLogEntries( baseIndex, offset, entries );


InFlightMap<Long,RaftLogEntry> cache = new InFlightMap<>(); InFlightMap<RaftLogEntry> cache = new InFlightMap<>();


//when //when
batchAppend.applyTo( cache, log ); batchAppend.applyTo( cache, log );


//then //then
assertNull( cache.retrieve( 0L ) ); assertNull( cache.get( 0L ) );
assertNotNull( cache.retrieve( 1L ) ); assertNotNull( cache.get( 1L ) );
assertNotNull( cache.retrieve( 2L ) ); assertNotNull( cache.get( 2L ) );
} }
} }

0 comments on commit 7783bb3

Please sign in to comment.