Skip to content

Commit

Permalink
Extract CoreServerModule.
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj committed Jul 25, 2016
1 parent 6eb6aac commit 6a19fcd
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 209 deletions.
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge;

import java.io.File;
import java.io.IOException;
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.CatchupServer;
import org.neo4j.coreedge.catchup.CheckpointerSupplier;
import org.neo4j.coreedge.catchup.DataSourceSupplier;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.core.CoreToCoreClient;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreCopyClient;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher;
import org.neo4j.coreedge.catchup.tx.edge.TransactionLogCatchUpFactory;
import org.neo4j.coreedge.catchup.tx.edge.TxPullClient;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.raft.BatchingMessageHandler;
import org.neo4j.coreedge.raft.ConsensusModule;
import org.neo4j.coreedge.raft.ContinuousJob;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.pruning.PruningScheduler;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.raft.net.LoggingInbound;
import org.neo4j.coreedge.raft.state.CoreState;
import org.neo4j.coreedge.raft.state.CoreStateApplier;
import org.neo4j.coreedge.raft.state.CoreStateDownloader;
import org.neo4j.coreedge.raft.state.DurableStateStorage;
import org.neo4j.coreedge.raft.state.LongIndexMarshal;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.MemberId;
import org.neo4j.coreedge.server.NonBlockingChannels;
import org.neo4j.coreedge.server.core.MembershipWaiterLifecycle;
import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy;
import org.neo4j.coreedge.server.logging.MessageLogger;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD;

public class CoreServerModule
{
public final LifeSupport startupLifecycle;
public final MembershipWaiterLifecycle membershipWaiterLifecycle;

public CoreServerModule( MemberId myself, final PlatformModule platformModule, ConsensusModule consensusModule, CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule, File clusterStateDirectory, CoreTopologyService
discoveryService, LocalDatabase localDatabase, MessageLogger<MemberId> messageLogger )
{
final Dependencies dependencies = platformModule.dependencies;
final Config config = platformModule.config;
final LogService logging = platformModule.logging;
final FileSystemAbstraction fileSystem = platformModule.fileSystem;
final LifeSupport life = platformModule.life;
LogProvider logProvider = logging.getInternalLogProvider();

final Supplier<DatabaseHealth> databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class );

StateStorage<Long> lastFlushedStorage;

try
{
lastFlushedStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "last-flushed-state" ),
"last-flushed", new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ),
databaseHealthSupplier, logProvider ) );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
consensusModule.raftMembershipManager().setRecoverFromIndex( lastFlushedStorage.getInitialState() );

ListenSocketAddress raftListenAddress = config.get( CoreEdgeClusterSettings.raft_listen_address );

RaftServer raftServer = new RaftServer( new CoreReplicatedContentMarshal(), raftListenAddress, logProvider );

LoggingInbound<RaftMessages.StoreIdAwareMessage> loggingRaftInbound =
new LoggingInbound<>( raftServer, messageLogger, myself );

NonBlockingChannels nonBlockingChannels = new NonBlockingChannels();

CoreToCoreClient.ChannelInitializer channelInitializer =
new CoreToCoreClient.ChannelInitializer( logProvider, nonBlockingChannels );

int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );
long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle );

CoreToCoreClient coreToCoreClient = life.add(
new CoreToCoreClient( logProvider, channelInitializer, platformModule.monitors, maxQueueSize,
nonBlockingChannels, discoveryService, logThresholdMillis ) );
channelInitializer.setOwner( coreToCoreClient );

StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache,
new StoreCopyClient( coreToCoreClient ), new TxPullClient( coreToCoreClient ),
new TransactionLogCatchUpFactory() );

CoreStateApplier coreStateApplier = new CoreStateApplier( logProvider );
CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher,
coreToCoreClient, logProvider );

InFlightMap<Long,RaftLogEntry> inFlightMap = new InFlightMap<>();

NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself );

CoreState coreState = new CoreState( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(),
config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier,
logProvider, replicationModule.getProgressTracker(), lastFlushedStorage,
replicationModule.getSessionTracker(), someoneElse, coreStateApplier, downloader, inFlightMap,
platformModule.monitors );

dependencies.satisfyDependency( coreState );

life.add( new PruningScheduler( coreState, platformModule.jobScheduler,
config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) );

int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size );
int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch );

BatchingMessageHandler batchingMessageHandler =
new BatchingMessageHandler( consensusModule.raftInstance(), logProvider, queueSize, maxBatch, localDatabase, coreState );

long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );

MembershipWaiter membershipWaiter =
new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, batchingMessageHandler, logProvider );
long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
joinCatchupTimeout, consensusModule.raftInstance(), logProvider );

life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ),
batchingMessageHandler ) );

loggingRaftInbound.registerHandler( batchingMessageHandler );

CatchupServer catchupServer = new CatchupServer( logProvider, localDatabase,
platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ),
new DataSourceSupplier( platformModule ), new CheckpointerSupplier( platformModule.dependencies ),
coreState, config.get( CoreEdgeClusterSettings.transaction_listen_address ), platformModule.monitors );

startupLifecycle = new LifeSupport();
startupLifecycle.add( coreState );
startupLifecycle.add( raftServer );
startupLifecycle.add( catchupServer );
}
}
Expand Up @@ -65,6 +65,7 @@
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.LogProvider;

import static java.time.Clock.systemUTC;
Expand All @@ -73,11 +74,12 @@ public class ConsensusModule
{
private final MonitoredRaftLog raftLog;
private final RaftInstance raftInstance;
private final DelayedRenewableTimeoutService raftTimeoutService;
private final RaftMembershipManager raftMembershipManager;

public ConsensusModule( MemberId myself, final PlatformModule platformModule,
RaftOutbound raftOutbound, File clusterStateDirectory,
DelayedRenewableTimeoutService raftTimeoutService,
CoreTopologyService discoveryService, long recoverFromIndex )
CoreTopologyService discoveryService )
{
final Dependencies dependencies = platformModule.dependencies;
final Config config = platformModule.config;
Expand Down Expand Up @@ -156,11 +158,10 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,
SendToMyself leaderOnlyReplicator =
new SendToMyself( myself, loggingOutbound );

RaftMembershipManager raftMembershipManager =
new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout1, systemUTC(),
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage,
recoverFromIndex );
raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout1, systemUTC(),
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage
);

life.add( raftMembershipManager );

Expand All @@ -170,6 +171,8 @@ expectedClusterSize, electionTimeout1, systemUTC(),
config.get( CoreEdgeClusterSettings.catchup_batch_size ),
config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap );

raftTimeoutService = new DelayedRenewableTimeoutService( systemUTC(), logProvider );

raftInstance =
new RaftInstance( myself, termState, voteState, raftLog, electionTimeout1,
heartbeatInterval, raftTimeoutService, loggingOutbound, logProvider, raftMembershipManager,
Expand Down Expand Up @@ -237,4 +240,14 @@ public RaftInstance raftInstance()
{
return raftInstance;
}

public Lifecycle raftTimeoutService()
{
return raftTimeoutService;
}

public RaftMembershipManager raftMembershipManager()
{
return raftMembershipManager;
}
}
Expand Up @@ -58,7 +58,7 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe
private final RaftGroup.Builder<MemberId> memberSetBuilder;
private final ReadableRaftLog raftLog;
private final Log log;
private final long recoverFromIndex;
private long recoverFromIndex = -1;

private final StateStorage<RaftMembershipState> storage;
private final RaftMembershipState state;
Expand All @@ -72,8 +72,9 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe
private Set<MemberId> additionalReplicationMembers = new HashSet<>();

public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder<MemberId> memberSetBuilder,
ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, long electionTimeout,
Clock clock, long catchupTimeout, StateStorage<RaftMembershipState> membershipStorage, long recoverFromIndex )
ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize,
long electionTimeout, Clock clock, long catchupTimeout,
StateStorage<RaftMembershipState> membershipStorage )
{
this.sendToMyself = sendToMyself;
this.memberSetBuilder = memberSetBuilder;
Expand All @@ -83,11 +84,15 @@ public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder<Membe
this.state = membershipStorage.getInitialState();

this.log = logProvider.getLog( getClass() );
this.recoverFromIndex = recoverFromIndex;
this.membershipChanger = new RaftMembershipChanger( raftLog, clock,
electionTimeout, logProvider, catchupTimeout, this );
}

public void setRecoverFromIndex( long recoverFromIndex )
{
this.recoverFromIndex = recoverFromIndex;
}

@Override
public void start() throws Throwable
{
Expand Down
Expand Up @@ -19,87 +19,25 @@
*/
package org.neo4j.coreedge.server.core;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.catchup.CatchupServer;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.raft.state.CoreState;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import org.neo4j.kernel.lifecycle.Lifecycle;

public class CoreStartupProcess
{
public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager,
ReplicatedIdGeneratorFactory idGeneratorFactory, RaftInstance raft, CoreState coreState,
RaftServer raftServer, CatchupServer catchupServer,
DelayedRenewableTimeoutService raftTimeoutService, MembershipWaiter membershipWaiter,
long joinCatchupTimeout, LogProvider logProvider )
ReplicatedIdGeneratorFactory idGeneratorFactory, Lifecycle raftTimeoutService, Lifecycle coreServerStartupLifecycle,
MembershipWaiterLifecycle membershipWaiterLifecycle )
{
LifeSupport services = new LifeSupport();
services.add( dataSourceManager );
services.add( idGeneratorFactory );
services.add( coreState );
services.add( raftServer );
services.add( catchupServer );
services.add( coreServerStartupLifecycle );
services.add( raftTimeoutService );
services.add( new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, raft, logProvider ) );
services.add( membershipWaiterLifecycle );

return services;
}

private static class MembershipWaiterLifecycle extends LifecycleAdapter
{
private final MembershipWaiter membershipWaiter;
private final Long joinCatchupTimeout;
private final RaftInstance raft;
private final Log log;

private MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCatchupTimeout,
RaftInstance raft, LogProvider logProvider )
{
this.membershipWaiter = membershipWaiter;
this.joinCatchupTimeout = joinCatchupTimeout;
this.raft = raft;
this.log = logProvider.getLog( getClass() );
}

@Override
public void start() throws Throwable
{
CompletableFuture<Boolean> caughtUp = membershipWaiter.waitUntilCaughtUpMember( raft.state() );

try
{
caughtUp.get( joinCatchupTimeout, MILLISECONDS );
}
catch ( ExecutionException e )
{
log.error( "Server failed to join cluster", e.getCause() );
throw e.getCause();
}
catch ( InterruptedException | TimeoutException e )
{
String message =
format( "Server failed to join cluster within catchup time limit [%d ms]", joinCatchupTimeout );
log.error( message, e );
throw new RuntimeException( message, e );
}
finally
{
caughtUp.cancel( true );
}
}
}
}

0 comments on commit 6a19fcd

Please sign in to comment.