Skip to content

Commit

Permalink
Reuse of catchup components in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Mar 19, 2018
1 parent ae3a585 commit 2109cd7
Show file tree
Hide file tree
Showing 28 changed files with 790 additions and 649 deletions.
Expand Up @@ -24,6 +24,7 @@

import java.io.OutputStream;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -35,6 +36,7 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.SupportedProtocolCreator;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
Expand Down Expand Up @@ -141,8 +143,9 @@ private Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> chann
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository = new ProtocolInstallerRepository<>(
singletonList( new CatchupProtocolClientInstaller.Factory( clientPipelineBuilderFactory, logProvider, handler ) ),
ModifierProtocolInstaller.allClientInstallers );
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
return new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository,
clientPipelineBuilderFactory, config, logProvider );
clientPipelineBuilderFactory, handshakeTimeout , logProvider );
};
}

Expand Down
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;

import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols;
import org.neo4j.causalclustering.protocol.Protocol.ModifierProtocols;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstaller.Orientation.Client;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory.VOID_WRAPPER;
import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocolCategory.CATCHUP;
import static org.neo4j.time.Clocks.systemClock;

public class CatchupClientBuilder
{
private Duration handshakeTimeout = Duration.ofSeconds( 5 );
private LogProvider logProvider = NullLogProvider.getInstance();
private NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VOID_WRAPPER );
private ApplicationSupportedProtocols catchupProtocols = new ApplicationSupportedProtocols( CATCHUP, emptyList() );
private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList();
private long inactivityTimeoutMillis = SECONDS.toMillis( 20 );
private Clock clock = systemClock();

public CatchupClientBuilder()
{
}

public CatchupClientBuilder( ApplicationSupportedProtocols catchupProtocols, Collection<ModifierSupportedProtocols> modifierProtocols,
NettyPipelineBuilderFactory pipelineBuilder, Duration handshakeTimeout, long inactivityTimeoutMillis, LogProvider logProvider, Clock clock )
{
this.catchupProtocols = catchupProtocols;
this.modifierProtocols = modifierProtocols;
this.pipelineBuilder = pipelineBuilder;
this.handshakeTimeout = handshakeTimeout;
this.logProvider = logProvider;
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.clock = clock;
}

public CatchupClientBuilder catchupProtocols( ApplicationSupportedProtocols catchupProtocols )
{
this.catchupProtocols = catchupProtocols;
return this;
}

public CatchupClientBuilder modifierProtocols( Collection<ModifierSupportedProtocols> modifierProtocols )
{
this.modifierProtocols = modifierProtocols;
return this;
}

public CatchupClientBuilder pipelineBuilder( NettyPipelineBuilderFactory pipelineBuilder )
{
this.pipelineBuilder = pipelineBuilder;
return this;
}

public CatchupClientBuilder handshakeTimeout( Duration handshakeTimeout )
{
this.handshakeTimeout = handshakeTimeout;
return this;
}

public CatchupClientBuilder inactivityTimeoutMillis( long inactivityTimeoutMillis )
{
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
return this;
}

public CatchupClientBuilder logProvider( LogProvider logProvider )
{
this.logProvider = logProvider;
return this;
}

public CatchupClientBuilder clock( Clock clock )
{
this.clock = clock;
return this;
}

public CatchUpClient build()
{
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), modifierProtocols );

Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer = handler -> {
List<ProtocolInstaller.Factory<Client,?>> installers = singletonList(
new CatchupProtocolClientInstaller.Factory( pipelineBuilder, logProvider, handler ) );

ProtocolInstallerRepository<Client> protocolInstallerRepository = new ProtocolInstallerRepository<>( installers,
ModifierProtocolInstaller.allClientInstallers );

return new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, pipelineBuilder,
handshakeTimeout, logProvider );
};

return new CatchUpClient( logProvider, clock, inactivityTimeoutMillis, channelInitializer );
}
}
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.catchup;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

Expand Down Expand Up @@ -102,7 +103,7 @@ public void install( Channel channel ) throws Exception
.add( "enc_snapshot", new CoreSnapshotEncoder() )
.add( "enc_file_chunk", new FileChunkEncoder() )
.add( "enc_file_header", new FileHeaderEncoder() )
.add( "in_req_type", handler.serverMessageHandler() )
.add( "in_req_type", serverMessageHandler( state ) )
.add( "dec_req_dispatch", requestDecoders( state ) )
.add( "out_chunked_write", new ChunkedWriteHandler() )
.add( "hnd_req_tx", handler.txPullRequestHandler() )
Expand All @@ -114,6 +115,11 @@ public void install( Channel channel ) throws Exception
.install();
}

private ChannelHandler serverMessageHandler( CatchupServerProtocol state )
{
return new ServerMessageTypeHandler( state, logProvider );
}

private ChannelInboundHandler requestDecoders( CatchupServerProtocol protocol )
{
RequestDecoderDispatcher<CatchupServerProtocol.State> decoderDispatcher = new RequestDecoderDispatcher<>( protocol, logProvider );
Expand Down
@@ -0,0 +1,129 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup;

import io.netty.channel.ChannelInboundHandler;

import java.util.Collection;
import java.util.function.Function;

import org.neo4j.causalclustering.net.Server;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols;
import org.neo4j.causalclustering.protocol.Protocol.ModifierProtocols;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory.VOID_WRAPPER;
import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocolCategory.CATCHUP;

public class CatchupServerBuilder
{
private final Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory;
private LogProvider debugLogProvider = NullLogProvider.getInstance();
private LogProvider userLogProvider = NullLogProvider.getInstance();
private NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VOID_WRAPPER );
private ApplicationSupportedProtocols catchupProtocols = new ApplicationSupportedProtocols( CATCHUP, emptyList() );
private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList();
private ChannelInboundHandler parentHandler;
private ListenSocketAddress listenAddress;
private String serverName = "catchup-server";

public CatchupServerBuilder( Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory )
{
this.handlerFactory = handlerFactory;
}

public CatchupServerBuilder catchupProtocols( ApplicationSupportedProtocols catchupProtocols )
{
this.catchupProtocols = catchupProtocols;
return this;
}

public CatchupServerBuilder modifierProtocols( Collection<ModifierSupportedProtocols> modifierProtocols )
{
this.modifierProtocols = modifierProtocols;
return this;
}

public CatchupServerBuilder pipelineBuilder( NettyPipelineBuilderFactory pipelineBuilder )
{
this.pipelineBuilder = pipelineBuilder;
return this;
}

public CatchupServerBuilder serverHandler( ChannelInboundHandler parentHandler )
{
this.parentHandler = parentHandler;
return this;
}

public CatchupServerBuilder listenAddress( ListenSocketAddress listenAddress )
{
this.listenAddress = listenAddress;
return this;
}

public CatchupServerBuilder userLogProvider( LogProvider userLogProvider )
{
this.userLogProvider = userLogProvider;
return this;
}

public CatchupServerBuilder debugLogProvider( LogProvider debugLogProvider )
{
this.debugLogProvider = debugLogProvider;
return this;
}

public CatchupServerBuilder serverName( String serverName )
{
this.serverName = serverName;
return this;
}

public Server build()
{
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), modifierProtocols );

CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, debugLogProvider,
handlerFactory );

ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository = new ProtocolInstallerRepository<>(
singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );

HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, pipelineBuilder, debugLogProvider );

return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName );
}
}

0 comments on commit 2109cd7

Please sign in to comment.