Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent a18a4bc commit 2cde460
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 178 deletions.
Expand Up @@ -88,7 +88,8 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
progressRetryStrategy,
leaderRetryStrategy,
availabilityTimeoutMillis,
globalAvailabilityGuard, logProvider, platformModule.monitors );
globalAvailabilityGuard, logProvider,
platformModule.monitors );
}

public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -109,32 +109,18 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
*/
channel.putInt( -1 );
}
try

// write to chunks if empty and there is more to write
while ( txWriter.canWrite() && chunks.isEmpty() )
{
// write to chunks if empty and there is more to write
while ( txWriter.canWrite() && chunks.isEmpty() )
{
txWriter.write( channel );
}
// nothing more to write, close the channel to get the potential last buffer
if ( chunks.isEmpty() )
{
channel.close();
}
return chunks.poll();
txWriter.write( channel );
}
catch ( Throwable t )
// nothing more to write, close the channel to get the potential last buffer
if ( chunks.isEmpty() )
{
try
{
close();
}
catch ( Exception e )
{
t.addSuppressed( e );
}
throw t;
channel.close();
}
return chunks.poll();
}

@Override
Expand Down
Expand Up @@ -25,8 +25,8 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

import java.time.Clock;
Expand All @@ -50,6 +50,7 @@ public class ReconnectingChannel implements Channel

private final Log log;
private final Bootstrap bootstrap;
private final EventLoop eventLoop;
private final SocketAddress destination;
private final TimeoutStrategy connectionBackoffStrategy;

Expand All @@ -61,14 +62,16 @@ public class ReconnectingChannel implements Channel
private TimeoutStrategy.Timeout connectionBackoff;
private CappedLogger cappedLogger;

ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log )
ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log )
{
this( bootstrap, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) );
this( bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) );
}

private ReconnectingChannel( Bootstrap bootstrap, SocketAddress destination, final Log log, TimeoutStrategy connectionBackoffStrategy )
private ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log,
TimeoutStrategy connectionBackoffStrategy )
{
this.bootstrap = bootstrap;
this.eventLoop = eventLoop;
this.destination = destination;
this.log = log;
this.cappedLogger = new CappedLogger( log ).setTimeLimit( 20, TimeUnit.SECONDS, Clock.systemUTC() );
Expand Down Expand Up @@ -157,32 +160,34 @@ private Future<Void> write( Object msg, boolean flush )

if ( channel.isActive() )
{
return doWrite( msg, flush );
if ( flush )
{
return channel.writeAndFlush( msg );
}
else
{
return channel.write( msg );
}
}
else
{
Promise<Void> promise = new DefaultPromise<>( bootstrap.config().group().next() );
Promise<Void> promise = eventLoop.newPromise();
BiConsumer<io.netty.channel.Channel,Object> writer;

writer = ( channel, message ) -> chain( doWrite( msg, flush ), promise );
if ( flush )
{
writer = ( channel, message ) -> chain( channel.writeAndFlush( msg ), promise );
}
else
{
writer = ( channel, message ) -> chain( channel.write( msg ), promise );
}

deferredWrite( msg, fChannel, promise, true, writer );
return promise;
}
}

private ChannelFuture doWrite( Object msg, boolean flush )
{
if ( flush )
{
return channel.writeAndFlush( msg );
}
else
{
return channel.write( msg );
}
}

/**
* Chains a channel future to a promise. Used when the returned promise
* was not allocated through the channel and cannot be used as the
Expand Down
Expand Up @@ -107,7 +107,7 @@ private Channel channel( AdvertisedSocketAddress destination )

if ( channel == null )
{
channel = new ReconnectingChannel( bootstrap, destination, log );
channel = new ReconnectingChannel( bootstrap, eventLoopGroup.next(), destination, log );
channel.start();
ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel );

Expand Down

This file was deleted.

Expand Up @@ -41,7 +41,7 @@ public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder

ReplicatedContentChunkDecoder()
{
setCumulator( new ContentChunkAccumulator() );
setCumulator( new ContentChunkCumulator() );
}

@Override
Expand All @@ -60,7 +60,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
}
}

private class ContentChunkAccumulator implements Cumulator
private class ContentChunkCumulator implements Cumulator
{
@Override
public ByteBuf cumulate( ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in )
Expand Down
Expand Up @@ -90,7 +90,7 @@ public void before()
{
elg = new NioEventLoopGroup( 0 );
Bootstrap bootstrap = new Bootstrap().channel( NioSocketChannel.class ).group( elg ).handler( childCounter );
channel = new ReconnectingChannel( bootstrap, listenAddress, log );
channel = new ReconnectingChannel( bootstrap, elg.next(), listenAddress, log );
}

@After
Expand Down

This file was deleted.

0 comments on commit 2cde460

Please sign in to comment.