Skip to content

Commit

Permalink
New CatchupClient synchronous messaging.
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj committed Aug 22, 2016
1 parent 72f261b commit 3a32d1e
Show file tree
Hide file tree
Showing 42 changed files with 1,081 additions and 839 deletions.
@@ -0,0 +1,189 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.CatchUpRequest;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MICROSECONDS;

public class CatchUpClient extends LifecycleAdapter
{
private final LogProvider logProvider;
private final TopologyService discoveryService;
private final Clock clock;

private final Map<AdvertisedSocketAddress, CatchUpChannel> idleChannels = new HashMap<>();
private final Set<CatchUpChannel> activeChannels = new HashSet<>();
private final Log log;

private NioEventLoopGroup eventLoopGroup;

public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, Clock clock )
{
this.logProvider = logProvider;
this.discoveryService = discoveryService;
this.log = logProvider.getLog( getClass() );
this.clock = clock;
}

public <T> T makeBlockingRequest( MemberId memberId, CatchUpRequest request,
long inactivityTimeout, TimeUnit timeUnit,
CatchUpResponseCallback<T> responseHandler )
throws CatchUpClientException, NoKnownAddressesException
{
CompletableFuture<T> future = new CompletableFuture<>();
CatchUpChannel channel = acquireChannel( memberId );

future.whenComplete( ( result, e ) -> {
if ( e == null )
{
release( channel );
}
else
{
dispose( channel );
}
} );

channel.setResponseHandler( responseHandler, future );
channel.send( request );

return TimeoutLoop.waitForCompletion( future, channel::millisSinceLastResponse, inactivityTimeout, timeUnit );
}

private synchronized void dispose( CatchUpChannel channel )
{
activeChannels.remove( channel );
channel.close();
}

private synchronized void release( CatchUpChannel channel )
{
activeChannels.remove( channel );
idleChannels.put( channel.destination, channel );
}

private synchronized CatchUpChannel acquireChannel( MemberId memberId ) throws NoKnownAddressesException
{
AdvertisedSocketAddress catchUpAddress =
discoveryService.currentTopology().coreAddresses( memberId ).getCatchupServer();
CatchUpChannel channel = idleChannels.remove( catchUpAddress );
if ( channel == null )
{
channel = new CatchUpChannel( catchUpAddress );
}
activeChannels.add( channel );
return channel;
}

private class CatchUpChannel
{
private final TrackingResponseHandler handler;
private final AdvertisedSocketAddress destination;
private Channel nettyChannel;

CatchUpChannel( AdvertisedSocketAddress destination )
{
this.destination = destination;
handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock );
Bootstrap bootstrap = new Bootstrap()
.group( eventLoopGroup )
.channel( NioSocketChannel.class )
.handler( new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel( SocketChannel ch ) throws Exception
{
CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider );
}
} );

ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() );
nettyChannel = channelFuture.awaitUninterruptibly().channel();
}

void setResponseHandler( CatchUpResponseCallback responseHandler,
CompletableFuture<?> requestOutcomeSignal )
{
handler.setResponseHandler( responseHandler, requestOutcomeSignal );
}

void send( CatchUpRequest request )
{
nettyChannel.write( request.messageType() );
nettyChannel.writeAndFlush( request );
}

long millisSinceLastResponse()
{
return clock.millis() - handler.lastResponseTime();
}

void close()
{
nettyChannel.close();
}
}

@Override
public void start()
{
eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "catch-up-client" ) );
}

@Override
public void stop() throws Throwable
{
try
{
idleChannels.values().forEach( CatchUpChannel::close );
activeChannels.forEach( CatchUpChannel::close );
eventLoopGroup.shutdownGracefully( 0, 0, MICROSECONDS ).sync();
}
catch ( InterruptedException e )
{
log.warn( "Interrupted while stopping catch up client." );
}
}
}
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import org.neo4j.coreedge.VersionDecoder;
import org.neo4j.coreedge.VersionPrepender;
import org.neo4j.coreedge.catchup.storecopy.FileContentDecoder;
import org.neo4j.coreedge.catchup.storecopy.FileContentHandler;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderDecoder;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderHandler;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequestEncoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdResponseDecoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdResponseHandler;
import org.neo4j.coreedge.catchup.storecopy.GetStoreRequestEncoder;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseHandler;
import org.neo4j.coreedge.catchup.tx.TxPullRequestEncoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseHandler;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseHandler;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotDecoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestEncoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotResponseHandler;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.logging.LogProvider;

class CatchUpClientChannelPipeline
{
static void initChannel( SocketChannel ch, CatchUpResponseHandler handler, LogProvider logProvider )
throws Exception
{
CatchupClientProtocol protocol = new CatchupClientProtocol();

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );

pipeline.addLast( new VersionDecoder( logProvider ) );
pipeline.addLast( new VersionPrepender() );

pipeline.addLast( new TxPullRequestEncoder() );
pipeline.addLast( new GetStoreRequestEncoder() );
pipeline.addLast( new CoreSnapshotRequestEncoder() );
pipeline.addLast( new GetStoreIdRequestEncoder() );
pipeline.addLast( new ResponseMessageTypeEncoder() );
pipeline.addLast( new RequestMessageTypeEncoder() );

pipeline.addLast( new ClientMessageTypeHandler( protocol, logProvider ) );

RequestDecoderDispatcher<CatchupClientProtocol.State> decoderDispatcher =
new RequestDecoderDispatcher<>( protocol, logProvider );
decoderDispatcher.register( CatchupClientProtocol.State.STORE_ID, new GetStoreIdResponseDecoder() );
decoderDispatcher.register( CatchupClientProtocol.State.TX_PULL_RESPONSE, new TxPullResponseDecoder() );
decoderDispatcher.register( CatchupClientProtocol.State.CORE_SNAPSHOT, new CoreSnapshotDecoder() );
decoderDispatcher.register( CatchupClientProtocol.State.STORE_COPY_FINISHED, new
StoreCopyFinishedResponseDecoder() );
decoderDispatcher.register( CatchupClientProtocol.State.TX_STREAM_FINISHED, new
TxStreamFinishedResponseDecoder() );
decoderDispatcher.register( CatchupClientProtocol.State.FILE_HEADER, new FileHeaderDecoder() );
decoderDispatcher.register( CatchupClientProtocol.State.FILE_CONTENTS, new FileContentDecoder() );

pipeline.addLast( decoderDispatcher );

pipeline.addLast( new TxPullResponseHandler( protocol, handler ) );
pipeline.addLast( new CoreSnapshotResponseHandler( protocol, handler ) );
pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, handler ) );
pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, handler ) );
pipeline.addLast( new FileHeaderHandler( protocol, handler, logProvider ) );
pipeline.addLast( new FileContentHandler( protocol, handler ) );
pipeline.addLast( new GetStoreIdResponseHandler( protocol, handler ) );

pipeline.addLast( new ExceptionLoggingHandler( logProvider.getLog( CatchUpClient.class ) ) );
}
}
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

public class CatchUpClientException extends Exception
{
public CatchUpClientException( Throwable cause )
{
super( cause );
}
}
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

import static java.lang.String.format;

public class CatchUpProtocolViolationException extends Exception
{
public CatchUpProtocolViolationException( String message, Object... args )
{
super( format( message, args ) );
}
}

0 comments on commit 3a32d1e

Please sign in to comment.