Skip to content

Commit

Permalink
New cache for in-flight Raft entries
Browse files Browse the repository at this point in the history
A highly efficient cache which only can keep a consecutive range of raft log entries
cached, allowing for a very simple and efficient circular buffer to be used. The cache
also allows putting a maximum bound on the amount of entries as well as their total combined
size.

The cache also exposes metrics so that cache hits, misses and total data can be observed.
  • Loading branch information
martinfurmanski committed Nov 14, 2017
1 parent cd4e176 commit b60c9f3
Show file tree
Hide file tree
Showing 39 changed files with 1,851 additions and 198 deletions.
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;


import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCacheFactory;
import org.neo4j.configuration.Description; import org.neo4j.configuration.Description;
import org.neo4j.configuration.Internal; import org.neo4j.configuration.Internal;
import org.neo4j.configuration.LoadableConfig; import org.neo4j.configuration.LoadableConfig;
Expand Down Expand Up @@ -124,6 +125,19 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ),
NO_DEFAULT ); NO_DEFAULT );


@Description( "Type of in-flight cache." )
public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type =
setting( "causal_clustering.in_flight_cache.type", options( InFlightCacheFactory.Type.class, true ),
InFlightCacheFactory.Type.CONSECUTIVE.name() );

@Description( "The maximum number of entries in the in-flight cache." )
public static final Setting<Integer> in_flight_cache_max_entries =
setting( "causal_clustering.in_flight_cache.max_entries", INTEGER, "1024" );

@Description( "The maximum number of bytes in the in-flight cache." )
public static final Setting<Long> in_flight_cache_max_bytes =
setting( "causal_clustering.in_flight_cache.max_bytes", BYTES, "2G" );

public enum DiscoveryType public enum DiscoveryType
{ {
DNS, DNS,
Expand Down
Expand Up @@ -23,13 +23,13 @@


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule; import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog; import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog;
import org.neo4j.causalclustering.core.consensus.log.MonitoredRaftLog; import org.neo4j.causalclustering.core.consensus.log.MonitoredRaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCacheFactory;
import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategy; import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategy;
import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategyFactory; import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategyFactory;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.causalclustering.core.consensus.log.segmented.SegmentedRaftLog;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSetBuilder; import org.neo4j.causalclustering.core.consensus.membership.MemberIdSetBuilder;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager; import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager;
Expand Down Expand Up @@ -73,7 +73,7 @@ public class ConsensusModule
private final RaftMachine raftMachine; private final RaftMachine raftMachine;
private final DelayedRenewableTimeoutService raftTimeoutService; private final DelayedRenewableTimeoutService raftTimeoutService;
private final RaftMembershipManager raftMembershipManager; private final RaftMembershipManager raftMembershipManager;
private final InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>(); private final InFlightCache inFlightCache;


public ConsensusModule( MemberId myself, final PlatformModule platformModule, public ConsensusModule( MemberId myself, final PlatformModule platformModule,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, File clusterStateDirectory, Outbound<MemberId,RaftMessages.RaftMessage> outbound, File clusterStateDirectory,
Expand Down Expand Up @@ -127,17 +127,18 @@ expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_t


life.add( raftMembershipManager ); life.add( raftMembershipManager );


inFlightCache = InFlightCacheFactory.create( config, platformModule.monitors );

RaftLogShippingManager logShipping = RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself, new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself,
raftMembershipManager, electionTimeout, config.get( catchup_batch_size ), raftMembershipManager, electionTimeout, config.get( catchup_batch_size ),
config.get( log_shipping_max_lag ), inFlightMap ); config.get( log_shipping_max_lag ), inFlightCache );


raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider ); raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider );


raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval, raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval,
raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightMap, raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightCache,
RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ), RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ), platformModule.monitors, systemClock() );
platformModule.monitors, systemClock() );


life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) ); life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) );


Expand Down Expand Up @@ -195,8 +196,8 @@ public RaftMembershipManager raftMembershipManager()
return raftMembershipManager; return raftMembershipManager;
} }


public InFlightMap<RaftLogEntry> inFlightMap() public InFlightCache inFlightCache()
{ {
return inFlightMap; return inFlightCache;
} }
} }
Expand Up @@ -27,9 +27,8 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Predicate; import java.util.function.Predicate;


import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager; import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager;
import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome; import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome;
import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
Expand Down Expand Up @@ -63,8 +62,8 @@
public class RaftMachine implements LeaderLocator, CoreMetaData public class RaftMachine implements LeaderLocator, CoreMetaData
{ {
private final LeaderNotFoundMonitor leaderNotFoundMonitor; private final LeaderNotFoundMonitor leaderNotFoundMonitor;
private final InFlightMap<RaftLogEntry> inFlightMap;
private RenewableTimeoutService.RenewableTimeout heartbeatTimer; private RenewableTimeoutService.RenewableTimeout heartbeatTimer;
private InFlightCache inFlightCache;


public enum Timeouts implements RenewableTimeoutService.TimeoutName public enum Timeouts implements RenewableTimeoutService.TimeoutName
{ {
Expand Down Expand Up @@ -97,7 +96,7 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS
RaftLog entryLog, long electionTimeout, long heartbeatInterval, RaftLog entryLog, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService, Outbound<MemberId,RaftMessages.RaftMessage> outbound, RenewableTimeoutService renewableTimeoutService, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping,
InFlightMap<RaftLogEntry> inFlightMap, boolean refuseToBecomeLeader, Monitors monitors, Clock clock ) InFlightCache inFlightCache, boolean refuseToBecomeLeader, Monitors monitors, Clock clock )
{ {
this.myself = myself; this.myself = myself;
this.electionTimeout = electionTimeout; this.electionTimeout = electionTimeout;
Expand All @@ -113,8 +112,8 @@ public RaftMachine( MemberId myself, StateStorage<TermState> termStorage, StateS
this.refuseToBecomeLeader = refuseToBecomeLeader; this.refuseToBecomeLeader = refuseToBecomeLeader;
this.clock = clock; this.clock = clock;


this.inFlightMap = inFlightMap; this.inFlightCache = inFlightCache;
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap, this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightCache,
logProvider ); logProvider );


leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );
Expand All @@ -136,7 +135,7 @@ public synchronized void postRecoveryActions()
renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) ); renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) );
} }


inFlightMap.enable(); inFlightCache.enable();
} }


public synchronized void stopTimers() public synchronized void stopTimers()
Expand Down
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;

import static java.lang.Math.floorMod;
import static java.util.Arrays.fill;

/**
* <pre>
* Design
*
* S: start index
* E: end index
*
* When S == E the buffer is empty.
*
* Examples:
*
* S
* v
* Empty [ | | | | | | ]
* ^
* E
*
*
* S
* v
* Size 2 [ | | | | | | ]
* ^
* E
*
*
* S
* v
* Full [ | | | | | | ]
* ^
* E
*
* New items are put at the current E, and then E is moved one step forward (circularly).
* The item at E is never a valid item.
*
* If moving E one step forward moves it onto S
* - then it knocks an element out
* - and S is also moved one step forward
*
* The S element has index 0.
* Removing an element moves S forward (circularly).
*
* @param <V> type of elements.
*/
public class CircularBuffer<V>
{
private final int arraySize; // externally visible capacity is arraySize - 1
private Object[] elementArr;

private int S;
private int E;

CircularBuffer( int capacity )
{
if ( capacity <= 0 )
{
throw new IllegalArgumentException( "Capacity must be > 0." );
}

this.arraySize = capacity + 1; // 1 item as sentinel (can't hold entries)
this.elementArr = new Object[arraySize];
}

/**
* Clears the underlying buffer and fills the provided eviction array with all evicted elements.
* The provided array must have the same capacity as the circular buffer.
*
* @param evictions Caller-provided array for evictions.
*/
public void clear( V[] evictions )
{
if ( evictions.length != arraySize - 1 )
{
throw new IllegalArgumentException( "The eviction array must be of the same size as the capacity of the circular buffer." );
}

int i = 0;
while ( S != E )
{
//noinspection unchecked
evictions[i++] = (V) elementArr[S];
S = pos( S, 1 );
}

S = 0;
E = 0;

fill( elementArr, null );
}

private int pos( int base, int delta )
{
return floorMod( base + delta, arraySize );
}

/**
* Append to the end of the buffer, possibly overwriting the
* oldest entry.
*
* @return any knocked out item, or null if nothing was knocked out.
*/
public V append( V e )
{
elementArr[E] = e;
E = pos( E, 1 );
if ( E == S )
{
//noinspection unchecked
V old = (V) elementArr[E];
elementArr[E] = null;
S = pos( S, 1 );
return old;
}
else
{
return null;
}
}

public V read( int idx )
{
//noinspection unchecked
return (V) elementArr[pos( S, idx )];
}

public V remove()
{
if ( S == E )
{
return null;
}
//noinspection unchecked
V e = (V) elementArr[S];
elementArr[S] = null;
S = pos( S, 1 );
return e;
}

public V removeHead()
{
if ( S == E )
{
return null;
}

E = pos( E, -1 );
//noinspection unchecked
V e = (V) elementArr[E];
elementArr[E] = null;
return e;
}

public int size()
{
return floorMod( E - S, arraySize );
}
}

0 comments on commit b60c9f3

Please sign in to comment.