Skip to content

Commit

Permalink
Replace Throttler
Browse files Browse the repository at this point in the history
The new throttler resides in the ReconnectingChannel and it uses
Nettys isWritable to pause writing if necessary
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent 01c6be3 commit 1cfb229
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 404 deletions.
Expand Up @@ -72,7 +72,6 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession );
progressTracker = new ProgressTrackerImpl( myGlobalSession );

long replicationLimit = config.get( CausalClusteringSettings.replication_total_size_limit );
Duration initialBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_base );
Duration upperBoundBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_limit );
Duration leaderBackoff = config.get( CausalClusteringSettings.replication_leader_retry_timeout );
Expand All @@ -89,7 +88,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
progressRetryStrategy,
leaderRetryStrategy,
availabilityTimeoutMillis,
globalAvailabilityGuard, logProvider, replicationLimit, platformModule.monitors );
globalAvailabilityGuard, logProvider, platformModule.monitors );
}

public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -275,10 +275,6 @@ public HostnameResolver getHostnameResolver( LogProvider logProvider, LogProvide
public static final Setting<Integer> replicated_lock_token_state_size =
setting( "causal_clustering.replicated_lock_token_state_size", INTEGER, "1000" );

@Description( "The maximum amount of data which can be in the replication stage concurrently." )
public static final Setting<Long> replication_total_size_limit =
setting( "causal_clustering.replication_total_size_limit", BYTES, "128M" );

@Description( "The initial timeout until replication is retried. The timeout will increase exponentially." )
public static final Setting<Duration> replication_retry_timeout_base =
setting( "causal_clustering.replication_retry_timeout_base", DURATION, "10s" );
Expand Down
Expand Up @@ -56,13 +56,12 @@ public class RaftReplicator implements Replicator, LeaderListener
private final LeaderLocator leaderLocator;
private final TimeoutStrategy leaderTimeoutStrategy;
private final Log log;
private final Throttler throttler;
private final ReplicationMonitor replicationMonitor;
private final long availabilityTimeoutMillis;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, long availabilityTimeoutMillis,
AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit, Monitors monitors )
AvailabilityGuard availabilityGuard, LogProvider logProvider, Monitors monitors )
{
this.me = me;
this.outbound = outbound;
Expand All @@ -72,7 +71,6 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<Member
this.leaderTimeoutStrategy = leaderTimeoutStrategy;
this.availabilityTimeoutMillis = availabilityTimeoutMillis;
this.availabilityGuard = availabilityGuard;
this.throttler = new Throttler( replicationLimit );
this.leaderLocator = leaderLocator;
leaderLocator.registerListener( this );
log = logProvider.getLog( getClass() );
Expand All @@ -91,31 +89,14 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
{
throw new ReplicationFailureException( "Replication aborted since no leader was available", e );
}

if ( command.size().isPresent() )
{
try
{
return throttler.invoke( () -> replicate0( command, trackResult, originalLeader ), command.size().get() );
}
catch ( InterruptedException e )
{
throw new ReplicationFailureException( "Interrupted while waiting for replication credits", e );
}
}
else
{
return replicate0( command, trackResult, originalLeader );
}
return replicate0( command, trackResult, originalLeader );
}

private Future<Object> replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException
{
replicationMonitor.startReplication();
try
{
assertNoLeaderSwitch( leader );

OperationContext session = sessionPool.acquireSession();

DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() );
Expand All @@ -126,7 +107,7 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
int attempts = 0;
try
{
do
while ( true )
{
attempts++;
if ( attempts > 1 )
Expand All @@ -139,10 +120,11 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
{
// blocking at least until the send has succeeded or failed before retrying
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true );

leaderTimeout = leaderTimeoutStrategy.newTimeout();

progress.awaitReplication( progressTimeout.getMillis() );
if ( progress.isReplicated() )
{
break;
}
progressTimeout.increment();
leader = leaderLocator.getLeader();
}
Expand All @@ -153,7 +135,6 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
leaderTimeout.increment();
}
}
while ( !progress.isReplicated() );
}
catch ( InterruptedException e )
{
Expand Down Expand Up @@ -182,24 +163,6 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul

}

private void assertNoLeaderSwitch( MemberId originalLeader ) throws ReplicationFailureException
{
MemberId currentLeader;
try
{
currentLeader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{
throw new ReplicationFailureException( "Replication aborted since no leader was available", e );
}

if ( !currentLeader.equals( originalLeader ) )
{
throw new ReplicationFailureException( "Replication aborted since a leader switch was detected" );
}
}

@Override
public void onLeaderSwitch( LeaderInfo leaderInfo )
{
Expand Down

This file was deleted.

Expand Up @@ -25,8 +25,10 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

import java.time.Clock;
Expand All @@ -50,9 +52,9 @@ public class ReconnectingChannel implements Channel

private final Log log;
private final Bootstrap bootstrap;
private final EventLoop eventLoop;
private final SocketAddress destination;
private final TimeoutStrategy connectionBackoffStrategy;
private final WritabilityThrottler writabilityThrottler;

private volatile io.netty.channel.Channel channel;
private volatile ChannelFuture fChannel;
Expand All @@ -62,21 +64,21 @@ public class ReconnectingChannel implements Channel
private TimeoutStrategy.Timeout connectionBackoff;
private CappedLogger cappedLogger;

ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log )
ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log )
{
this( bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) );
this( bootstrap, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ), new WritabilityThrottler() );
}

private ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log,
TimeoutStrategy connectionBackoffStrategy )
private ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log, TimeoutStrategy connectionBackoffStrategy,
WritabilityThrottler writabilityThrottler )
{
this.bootstrap = bootstrap;
this.eventLoop = eventLoop;
this.destination = destination;
this.log = log;
this.cappedLogger = new CappedLogger( log ).setTimeLimit( 20, TimeUnit.SECONDS, Clock.systemUTC() );
this.connectionBackoffStrategy = connectionBackoffStrategy;
this.connectionBackoff = connectionBackoffStrategy.newTimeout();
this.writabilityThrottler = writabilityThrottler;
}

void start()
Expand All @@ -97,6 +99,16 @@ else if ( fChannel != null && !fChannel.isDone() )

fChannel = bootstrap.connect( destination.socketAddress() );
channel = fChannel.channel();
writabilityThrottler.setIsWritable( channel.isWritable() );
channel.pipeline().addFirst( new ChannelInboundHandlerAdapter()
{
@Override
public void channelWritabilityChanged( ChannelHandlerContext ctx ) throws Exception
{
writabilityThrottler.setIsWritable( ctx.channel().isWritable() );
super.channelWritabilityChanged( ctx );
}
} );

fChannel.addListener( ( ChannelFuture f ) ->
{
Expand Down Expand Up @@ -160,31 +172,40 @@ private Future<Void> write( Object msg, boolean flush )

if ( channel.isActive() )
{
if ( flush )
{
return channel.writeAndFlush( msg );
}
else
{
return channel.write( msg );
}
return awaitAndWrite( msg, flush );
}
else
{
Promise<Void> promise = eventLoop.newPromise();
Promise<Void> promise = new DefaultPromise<>( bootstrap.config().group().next() );
BiConsumer<io.netty.channel.Channel,Object> writer;

writer = ( channel, message ) -> chain( awaitAndWrite( msg, flush ), promise );

deferredWrite( msg, fChannel, promise, true, writer );
return promise;
}
}

private ChannelFuture awaitAndWrite( Object msg, boolean flush )
{
try
{
writabilityThrottler.awaitWritable();
if ( flush )
{
writer = ( channel, message ) -> chain( channel.writeAndFlush( msg ), promise );
return channel.writeAndFlush( msg );
}
else
{
writer = ( channel, message ) -> chain( channel.write( msg ), promise );
return channel.write( msg );
}

deferredWrite( msg, fChannel, promise, true, writer );
return promise;
}
catch ( InterruptedException e )
{
log.warn( "Interrupted while awaiting writability" );
Thread.currentThread().interrupt();
channel.close();
return channel.voidPromise().setFailure( e );
}
}

Expand Down
Expand Up @@ -107,7 +107,7 @@ private Channel channel( AdvertisedSocketAddress destination )

if ( channel == null )
{
channel = new ReconnectingChannel( bootstrap, eventLoopGroup.next(), destination, log );
channel = new ReconnectingChannel( bootstrap, destination, log );
channel.start();
ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel );

Expand Down
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.messaging;

import java.util.concurrent.atomic.AtomicBoolean;

class WritabilityThrottler
{
private AtomicBoolean isWritable = new AtomicBoolean( true );

synchronized void awaitWritable() throws InterruptedException
{
if ( !isWritable.get() )
{
wait();
}
}

synchronized void setIsWritable( boolean isWritable )
{
if ( this.isWritable.compareAndSet( !isWritable, isWritable ) && isWritable )
{
notify();
}
}
}

0 comments on commit 1cfb229

Please sign in to comment.