Skip to content

Commit

Permalink
Make catchup inactivity timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Oct 12, 2016
1 parent 3748c5c commit 220d7f5
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 29 deletions.
Expand Up @@ -31,50 +31,47 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.coreedge.catchup.TimeoutLoop.waitForCompletion;

public class CatchUpClient extends LifecycleAdapter
{
private static final int DEFAULT_INACTIVITY_TIMEOUT = 5; // seconds

private final LogProvider logProvider;
private final TopologyService discoveryService;
private final Log log;
private final Clock clock;
private final Monitors monitors;
private final long inactivityTimeoutMillis;
private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );

private NioEventLoopGroup eventLoopGroup;

public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, Clock clock, Monitors monitors )
public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, Clock clock,
long inactivityTimeoutMillis, Monitors monitors )
{
this.logProvider = logProvider;
this.discoveryService = discoveryService;
this.log = logProvider.getLog( getClass() );
this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.monitors = monitors;
}

public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
CatchUpResponseCallback<T> responseHandler ) throws CatchUpClientException, NoKnownAddressesException
{
return makeBlockingRequest( target, request, DEFAULT_INACTIVITY_TIMEOUT, TimeUnit.SECONDS, responseHandler );
}

private <T> T makeBlockingRequest( MemberId target, CatchUpRequest request, long inactivityTimeout,
TimeUnit timeUnit, CatchUpResponseCallback<T> responseHandler )
throws CatchUpClientException, NoKnownAddressesException
{
CompletableFuture<T> future = new CompletableFuture<>();
AdvertisedSocketAddress catchUpAddress =
Expand All @@ -97,8 +94,7 @@ private <T> T makeBlockingRequest( MemberId target, CatchUpRequest request, long

String operation = String.format( "Timed out executing operation %s on %s", request, target );

return TimeoutLoop.waitForCompletion( future, operation,
channel::millisSinceLastResponse, inactivityTimeout, timeUnit );
return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis );
}

private class CatchUpChannel implements CatchUpChannelPool.Channel
Expand Down
Expand Up @@ -28,9 +28,9 @@
class TimeoutLoop
{
static <T> T waitForCompletion( Future<T> future, String operation, Supplier<Long> millisSinceLastResponseSupplier,
long inactivityTimeout, TimeUnit timeUnit ) throws CatchUpClientException
long inactivityTimeoutMillis ) throws CatchUpClientException
{
long remainingTimeoutMillis = timeUnit.toMillis( inactivityTimeout );
long remainingTimeoutMillis = inactivityTimeoutMillis;
while ( true )
{
try
Expand All @@ -49,9 +49,9 @@ static <T> T waitForCompletion( Future<T> future, String operation, Supplier<Lon
catch ( TimeoutException e )
{
long millisSinceLastResponse = millisSinceLastResponseSupplier.get();
if ( millisSinceLastResponse < timeUnit.toMillis( inactivityTimeout ) )
if ( millisSinceLastResponse < inactivityTimeoutMillis )
{
remainingTimeoutMillis = timeUnit.toMillis( inactivityTimeout ) - millisSinceLastResponse;
remainingTimeoutMillis = inactivityTimeoutMillis - millisSinceLastResponse;
}
else
{
Expand Down
Expand Up @@ -171,6 +171,12 @@ public class CoreEdgeClusterSettings
@Description( "Interval of pulling updates from cores." )
public static final Setting<Long> pull_interval = setting( "core_edge.pull_interval", DURATION, "1s" );

@Description( "The catch up protocol times out if the given duration elapses with not network activity. " +
"Every message received by the client from the server extends the time out duration." )
@Internal
public static final Setting<Long> catch_up_client_inactivity_timeout =
setting( "core_edge.catch_up_client_inactivity_timeout", DURATION, "5s" );

@Description("Throttle limit for logging unknown cluster member address")
public static final Setting<Long> unknown_address_logging_throttle =
setting( "core_edge.unknown_address_logging_throttle", DURATION, "10000ms" );
Expand Down
Expand Up @@ -114,8 +114,9 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
LoggingInbound<RaftMessages.ClusterIdAwareMessage> loggingRaftInbound =
new LoggingInbound<>( raftServer, messageLogger, identityModule.myself() );

long inactivityTimeoutMillis = config.get( CoreEdgeClusterSettings.catch_up_client_inactivity_timeout );
CatchUpClient catchUpClient = life.add( new CatchUpClient( clusteringModule.topologyService(), logProvider,
Clocks.systemClock(), monitors ) );
Clocks.systemClock(), inactivityTimeoutMillis, monitors ) );

StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache,
new StoreCopyClient( catchUpClient ), new TxPullClient( catchUpClient, platformModule.monitors ),
Expand Down
Expand Up @@ -172,8 +172,9 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
logProvider, refreshEdgeTimeoutService, edgeTimeToLiveTimeout, edgeRefreshRate );
life.add( dependencies.satisfyDependency( discoveryService ) );

Clock clock = Clocks.systemClock();
CatchUpClient catchUpClient = life.add( new CatchUpClient( discoveryService, logProvider, clock, monitors ) );
long inactivityTimeoutMillis = config.get( CoreEdgeClusterSettings.catch_up_client_inactivity_timeout );
CatchUpClient catchUpClient = life.add( new CatchUpClient( discoveryService, logProvider, Clocks.systemClock(),
inactivityTimeoutMillis, monitors ) );

final Supplier<DatabaseHealth> databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class );

Expand All @@ -189,8 +190,8 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
ContinuousJob txApplyJob = new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group(
"tx-applier", NEW_THREAD ), batchingTxApplier, logProvider );

DelayedRenewableTimeoutService txPullerTimeoutService = new DelayedRenewableTimeoutService( clock
, logProvider );
DelayedRenewableTimeoutService txPullerTimeoutService =
new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider );

LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir,
new StoreFiles( fileSystem ),
Expand Down
Expand Up @@ -19,15 +19,13 @@
*/
package org.neo4j.coreedge.catchup;

import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
Expand All @@ -46,7 +44,7 @@ public void shouldReturnImmediatelyIfFutureIsAlreadyComplete() throws Exception
Supplier<Long> lastResponseSupplier = () -> 1L;

// when
long value = TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 2, MILLISECONDS );
long value = TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 2 );

// then
assertEquals( 12L, value );
Expand All @@ -65,7 +63,7 @@ public void shouldTimeoutIfNoActivity() throws Exception
try
{
// when
TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 1, MILLISECONDS );
TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 1 );
fail( "Should have timed out" );
}
catch ( CatchUpClientException e )
Expand All @@ -86,7 +84,7 @@ public void shouldKeepWaitingIfThereIsSomeActivity() throws Exception
Supplier<Long> lastResponseSupplier = () -> 1L;

// when
long value = TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 2, MILLISECONDS );
long value = TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 2 );

// then
assertEquals( 12L, value );
Expand All @@ -102,7 +100,7 @@ public void shouldTranslateExecutionExceptionToCatchUpClientException() throws E
// when
try
{
TimeoutLoop.<Long>waitForCompletion( future, "", () -> 1L, 2, MILLISECONDS );
TimeoutLoop.<Long>waitForCompletion( future, "", () -> 1L, 2 );
fail( "Should have thrown exception" );
}
catch ( CatchUpClientException e )
Expand Down

0 comments on commit 220d7f5

Please sign in to comment.