Skip to content

Commit

Permalink
Single server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Mar 19, 2018
1 parent 0231be1 commit 77fdc94
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 238 deletions.
Expand Up @@ -65,6 +65,7 @@
import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.RaftOutbound; import org.neo4j.causalclustering.messaging.RaftOutbound;
import org.neo4j.causalclustering.messaging.SenderService; import org.neo4j.causalclustering.messaging.SenderService;
import org.neo4j.causalclustering.net.InstalledProtocolHandler;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -291,13 +292,15 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory; this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory;
this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() ); this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() );


InstalledProtocolHandler serverInstalledProtocolHandler = new InstalledProtocolHandler();

CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, coreStateMachinesModule, clusteringModule, CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, coreStateMachinesModule, clusteringModule,
replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), clientPipelineBuilderFactory, replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), clientPipelineBuilderFactory,
serverPipelineBuilderFactory ); serverPipelineBuilderFactory, serverInstalledProtocolHandler );


RaftServerModule raftServerModule = RaftServerModule.createAndStart( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, RaftServerModule.createAndStart( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, serverPipelineBuilderFactory,
serverPipelineBuilderFactory, messageLogger, topologyService, supportedRaftProtocols, supportedModifierProtocols ); messageLogger, topologyService, supportedRaftProtocols, supportedModifierProtocols, serverInstalledProtocolHandler );
serverInstalledProtocols = raftServerModule.raftServer()::installedProtocols; this.serverInstalledProtocols = serverInstalledProtocolHandler::installedProtocols;


editionInvariants( platformModule, dependencies, config, logging, life ); editionInvariants( platformModule, dependencies, config, logging, life );


Expand Down
Expand Up @@ -19,6 +19,8 @@
*/ */
package org.neo4j.causalclustering.core; package org.neo4j.causalclustering.core;


import io.netty.channel.ChannelHandler;

import java.util.Collection; import java.util.Collection;
import java.util.function.Function; import java.util.function.Function;


Expand All @@ -30,7 +32,6 @@
import org.neo4j.causalclustering.core.consensus.RaftMessageNettyHandler; import org.neo4j.causalclustering.core.consensus.RaftMessageNettyHandler;
import org.neo4j.causalclustering.core.consensus.RaftMessages.ReceivedInstantClusterIdAwareMessage; import org.neo4j.causalclustering.core.consensus.RaftMessages.ReceivedInstantClusterIdAwareMessage;
import org.neo4j.causalclustering.core.consensus.RaftProtocolServerInstaller; import org.neo4j.causalclustering.core.consensus.RaftProtocolServerInstaller;
import org.neo4j.causalclustering.core.consensus.RaftServer;
import org.neo4j.causalclustering.core.server.CoreServerModule; import org.neo4j.causalclustering.core.server.CoreServerModule;
import org.neo4j.causalclustering.core.state.RaftMessageApplier; import org.neo4j.causalclustering.core.state.RaftMessageApplier;
import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.discovery.CoreTopologyService;
Expand All @@ -40,6 +41,7 @@
import org.neo4j.causalclustering.messaging.ComposableMessageHandler; import org.neo4j.causalclustering.messaging.ComposableMessageHandler;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.LoggingInbound; import org.neo4j.causalclustering.messaging.LoggingInbound;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
Expand All @@ -50,6 +52,7 @@
import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer; import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository; import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols; import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.impl.factory.PlatformModule; import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler;
Expand All @@ -68,12 +71,11 @@ class RaftServerModule
private final NettyPipelineBuilderFactory pipelineBuilderFactory; private final NettyPipelineBuilderFactory pipelineBuilderFactory;
private final TopologyService topologyService; private final TopologyService topologyService;
private final Collection<ModifierSupportedProtocols> supportedModifierProtocols; private final Collection<ModifierSupportedProtocols> supportedModifierProtocols;
private final RaftServer raftServer;


private RaftServerModule( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule, private RaftServerModule( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule,
LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, MessageLogger<MemberId> messageLogger, LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, MessageLogger<MemberId> messageLogger,
CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol, CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol,
Collection<ModifierSupportedProtocols> supportedModifierProtocols ) Collection<ModifierSupportedProtocols> supportedModifierProtocols, ChannelHandler installedProtocolsHandler )
{ {
this.platformModule = platformModule; this.platformModule = platformModule;
this.consensusModule = consensusModule; this.consensusModule = consensusModule;
Expand All @@ -88,20 +90,20 @@ private RaftServerModule( PlatformModule platformModule, ConsensusModule consens


LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain = createMessageHandlerChain( coreServerModule ); LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain = createMessageHandlerChain( coreServerModule );


raftServer = createRaftServer( coreServerModule, messageHandlerChain ); createRaftServer( coreServerModule, messageHandlerChain, installedProtocolsHandler );
} }


static RaftServerModule createAndStart( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, static void createAndStart( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule,
CoreServerModule coreServerModule, LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, CoreServerModule coreServerModule, LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory,
MessageLogger<MemberId> messageLogger, CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol, MessageLogger<MemberId> messageLogger, CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol,
Collection<ModifierSupportedProtocols> supportedModifierProtocols ) Collection<ModifierSupportedProtocols> supportedModifierProtocols, ChannelHandler installedProtocolsHandler )
{ {
return new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, pipelineBuilderFactory, messageLogger, new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, pipelineBuilderFactory, messageLogger,
topologyService, supportedApplicationProtocol, supportedModifierProtocols ); topologyService, supportedApplicationProtocol, supportedModifierProtocols, installedProtocolsHandler );
} }


private RaftServer createRaftServer( CoreServerModule coreServerModule, private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain,
LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain ) ChannelHandler installedProtocolsHandler )
{ {
ApplicationProtocolRepository applicationProtocolRepository = ApplicationProtocolRepository applicationProtocolRepository =
new ApplicationProtocolRepository( Protocol.ApplicationProtocols.values(), supportedApplicationProtocol ); new ApplicationProtocolRepository( Protocol.ApplicationProtocols.values(), supportedApplicationProtocol );
Expand All @@ -114,10 +116,12 @@ private RaftServer createRaftServer( CoreServerModule coreServerModule,
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository =
new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers ); new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );


HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, pipelineBuilderFactory, logProvider ); protocolInstallerRepository, pipelineBuilderFactory, logProvider );
RaftServer raftServer = new RaftServer( handshakeServerInitializer, platformModule.config,
logProvider, platformModule.logging.getUserLogProvider() ); ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address );
Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(),
raftListenAddress, "raft-server" );


LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound = LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound =
new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() ); new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() );
Expand All @@ -126,10 +130,8 @@ private RaftServer createRaftServer( CoreServerModule coreServerModule,
platformModule.life.add( raftServer ); // must start before core state so that it can trigger snapshot downloads when necessary platformModule.life.add( raftServer ); // must start before core state so that it can trigger snapshot downloads when necessary
platformModule.life.add( coreServerModule.createCoreLife( messageHandlerChain ) ); platformModule.life.add( coreServerModule.createCoreLife( messageHandlerChain ) );
platformModule.life.add( coreServerModule.catchupServer() ); // must start last and stop first, since it handles external requests platformModule.life.add( coreServerModule.catchupServer() ); // must start last and stop first, since it handles external requests
coreServerModule.backupCatchupServer().ifPresent( platformModule.life::add ); coreServerModule.backupServer().ifPresent( platformModule.life::add );
platformModule.life.add( coreServerModule.downloadService() ); platformModule.life.add( coreServerModule.downloadService() );

return raftServer;
} }


private LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> createMessageHandlerChain( CoreServerModule coreServerModule ) private LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> createMessageHandlerChain( CoreServerModule coreServerModule )
Expand Down Expand Up @@ -157,9 +159,4 @@ private LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> createM
.compose( monitoringHandler ) .compose( monitoringHandler )
.apply( messageApplier ); .apply( messageApplier );
} }

public RaftServer raftServer()
{
return raftServer;
}
} }
Expand Up @@ -19,12 +19,13 @@
*/ */
package org.neo4j.causalclustering.core; package org.neo4j.causalclustering.core;


import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;


import java.util.Optional; import java.util.Optional;


import org.neo4j.causalclustering.catchup.CatchupServer; import org.neo4j.causalclustering.net.Server;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand All @@ -34,22 +35,25 @@ public class TransactionBackupServiceProvider
private final LogProvider logProvider; private final LogProvider logProvider;
private final LogProvider userLogProvider; private final LogProvider userLogProvider;
private final TransactionBackupServiceAddressResolver transactionBackupServiceAddressResolver; private final TransactionBackupServiceAddressResolver transactionBackupServiceAddressResolver;
private ChannelInitializer<SocketChannel> channelInitializer; private final ChannelInitializer<SocketChannel> channelInitializer;
private final ChannelHandler serverHandler;


public TransactionBackupServiceProvider( LogProvider logProvider, LogProvider userLogProvider, ChannelInitializer<SocketChannel> channelInitializer ) public TransactionBackupServiceProvider( LogProvider logProvider, LogProvider userLogProvider, ChannelInitializer<SocketChannel> channelInitializer,
ChannelHandler serverHandler )
{ {
this.logProvider = logProvider; this.logProvider = logProvider;
this.userLogProvider = userLogProvider; this.userLogProvider = userLogProvider;
this.channelInitializer = channelInitializer; this.channelInitializer = channelInitializer;
this.serverHandler = serverHandler;
this.transactionBackupServiceAddressResolver = new TransactionBackupServiceAddressResolver(); this.transactionBackupServiceAddressResolver = new TransactionBackupServiceAddressResolver();
} }


public Optional<CatchupServer> resolveIfBackupEnabled( Config config ) public Optional<Server> resolveIfBackupEnabled( Config config )
{ {
if ( config.get( OnlineBackupSettings.online_backup_enabled ) ) if ( config.get( OnlineBackupSettings.online_backup_enabled ) )
{ {
return Optional.of( new CatchupServer( channelInitializer, logProvider, userLogProvider, return Optional.of( new Server( channelInitializer, serverHandler, logProvider, userLogProvider,
transactionBackupServiceAddressResolver.backupAddressForTxProtocol( config ) ) ); transactionBackupServiceAddressResolver.backupAddressForTxProtocol( config ), "backup-server" ) );
} }
else else
{ {
Expand Down

This file was deleted.

0 comments on commit 77fdc94

Please sign in to comment.