Skip to content

Commit

Permalink
core-edge: various cleanup and remove duplicate code for raft logging
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Aug 18, 2016
1 parent 9e5d66f commit 3334435
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 138 deletions.
Expand Up @@ -53,7 +53,7 @@ public class ReplicationModule
private final SessionTracker sessionTracker;

public ReplicationModule( MemberId myself, PlatformModule platformModule, Config config,
ConsensusModule consensusModule, Outbound<MemberId,RaftMessages.RaftMessage> loggingOutbound,
ConsensusModule consensusModule, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
File clusterStateDirectory, FileSystemAbstraction fileSystem, LogProvider logProvider )
{
LifeSupport life = platformModule.life;
Expand All @@ -77,8 +77,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
progressTracker = new ProgressTrackerImpl( myGlobalSession );

replicator = new RaftReplicator( consensusModule.raftMachine(), myself,
loggingOutbound,
sessionPool, progressTracker,
outbound, sessionPool, progressTracker,
new ExponentialBackoffStrategy( 10, SECONDS ) );

}
Expand Down
Expand Up @@ -53,7 +53,6 @@
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.messaging.CoreOutbound;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.coreedge.messaging.SenderService;
import org.neo4j.helpers.Listeners;
Expand Down Expand Up @@ -81,11 +80,10 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe
private CompletableFuture<CoreSnapshot> coreSnapshotFuture;

public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService, long logThresholdMillis )
int maxQueueSize, TopologyService discoveryService, long logThresholdMillis )
{
this.logProvider = logProvider;
this.senderService =
new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels );
this.senderService = new SenderService( channelInitializer, logProvider, monitors, maxQueueSize );
this.outbound = new CoreOutbound( discoveryService, senderService, logProvider, logThresholdMillis );
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
}
Expand Down
Expand Up @@ -50,10 +50,9 @@
public class CoreToCoreClient extends CoreClient
{
public CoreToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService,
long logThresholdMillis )
int maxQueueSize, CoreTopologyService discoveryService, long logThresholdMillis )
{
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService,
super( logProvider, channelInitializer, monitors, maxQueueSize, discoveryService,
logThresholdMillis );
}

Expand Down
Expand Up @@ -47,10 +47,9 @@
public class EdgeToCoreClient extends CoreClient
{
public EdgeToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService,
long logThresholdMillis )
int maxQueueSize, TopologyService discoveryService, long logThresholdMillis )
{
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService,
super( logProvider, channelInitializer, monitors, maxQueueSize, discoveryService,
logThresholdMillis );
}

Expand Down
Expand Up @@ -26,35 +26,34 @@
import java.io.PrintWriter;
import java.util.function.Supplier;

import org.neo4j.coreedge.core.server.CoreServerModule;
import org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule;
import org.neo4j.coreedge.ReplicationModule;
import org.neo4j.coreedge.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.StoreFiles;
import org.neo4j.coreedge.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.coreedge.core.state.storage.MemberIdStorage;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.core.consensus.ConsensusModule;
import org.neo4j.coreedge.core.consensus.RaftMachine;
import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.discovery.procedures.CoreRoleProcedure;
import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.messaging.LoggingOutbound;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.coreedge.messaging.RaftChannelInitializer;
import org.neo4j.coreedge.messaging.RaftOutbound;
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.MemberId.MemberIdMarshal;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.messaging.SenderService;
import org.neo4j.coreedge.core.server.CoreServerModule;
import org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule;
import org.neo4j.coreedge.core.state.storage.MemberIdStorage;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.procedures.AcquireEndpointsProcedure;
import org.neo4j.coreedge.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.coreedge.discovery.procedures.CoreRoleProcedure;
import org.neo4j.coreedge.discovery.procedures.DiscoverEndpointAcquisitionServersProcedure;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.MemberId.MemberIdMarshal;
import org.neo4j.coreedge.logging.BetterMessageLogger;
import org.neo4j.coreedge.logging.MessageLogger;
import org.neo4j.coreedge.logging.NullMessageLogger;
import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.messaging.LoggingOutbound;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.coreedge.messaging.RaftChannelInitializer;
import org.neo4j.coreedge.messaging.RaftOutbound;
import org.neo4j.coreedge.messaging.SenderService;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
Expand Down Expand Up @@ -102,7 +101,6 @@ public class EnterpriseCoreEditionModule extends EditionModule
private final ConsensusModule consensusModule;
private final CoreTopologyService discoveryService;
private final LogProvider logProvider;
private final CoreStateMachinesModule coreStateMachinesModule;

public enum RaftLogImplementation
{
Expand All @@ -125,8 +123,8 @@ public void registerProcedures( Procedures procedures )
}
}

EnterpriseCoreEditionModule( final PlatformModule platformModule, DiscoveryServiceFactory
discoveryServiceFactory )
EnterpriseCoreEditionModule( final PlatformModule platformModule,
final DiscoveryServiceFactory discoveryServiceFactory )
{
final Dependencies dependencies = platformModule.dependencies;
final Config config = platformModule.config;
Expand Down Expand Up @@ -168,39 +166,26 @@ public void registerProcedures( Procedures procedures )
long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle );
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );

final SenderService senderService =
new SenderService( new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider ),
logProvider, platformModule.monitors,
maxQueueSize, new NonBlockingChannels() );
life.add( senderService );
final SenderService raftSender = new SenderService(
new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider ),
logProvider, platformModule.monitors, maxQueueSize );
life.add( raftSender );

final MessageLogger<MemberId> messageLogger;
if ( config.get( CoreEdgeClusterSettings.raft_messages_log_enable ) )
{
File logsDir = config.get( GraphDatabaseSettings.logs_directory );
messageLogger = life.add( new BetterMessageLogger<>( myself, raftMessagesLog( logsDir ) ) );
}
else
{
messageLogger = new NullMessageLogger<>();
}
final MessageLogger<MemberId> messageLogger = createMessageLogger( config, life, myself );

RaftOutbound raftOutbound =
new RaftOutbound( discoveryService, senderService, localDatabase, logProvider, logThresholdMillis );
Outbound<MemberId,RaftMessages.RaftMessage> loggingOutbound = new LoggingOutbound<>(
raftOutbound, myself, messageLogger );
Outbound<MemberId,RaftMessages.RaftMessage> raftOutbound = new LoggingOutbound<>(
new RaftOutbound( discoveryService, raftSender, localDatabase, logProvider, logThresholdMillis ),
myself, messageLogger );

consensusModule =
new ConsensusModule( myself, platformModule, raftOutbound, clusterStateDirectory, discoveryService );
consensusModule = new ConsensusModule( myself, platformModule, raftOutbound, clusterStateDirectory, discoveryService );

dependencies.satisfyDependency( consensusModule.raftMachine() );

ReplicationModule replicationModule = new ReplicationModule( myself, platformModule, config, consensusModule,
loggingOutbound, clusterStateDirectory,
fileSystem, logProvider );
raftOutbound, clusterStateDirectory, fileSystem, logProvider );

coreStateMachinesModule = new CoreStateMachinesModule( myself, platformModule, clusterStateDirectory, config, replicationModule.getReplicator(), consensusModule.raftMachine(),
dependencies, localDatabase );
CoreStateMachinesModule coreStateMachinesModule = new CoreStateMachinesModule( myself, platformModule, clusterStateDirectory, config,
replicationModule.getReplicator(), consensusModule.raftMachine(), dependencies, localDatabase );

this.idGeneratorFactory = coreStateMachinesModule.idGeneratorFactory;
this.idTypeConfigurationProvider = coreStateMachinesModule.idTypeConfigurationProvider;
Expand All @@ -222,6 +207,21 @@ public void registerProcedures( Procedures procedures )
life.add( coreServerModule.membershipWaiterLifecycle );
}

private MessageLogger<MemberId> createMessageLogger( Config config, LifeSupport life, MemberId myself )
{
final MessageLogger<MemberId> messageLogger;
if ( config.get( CoreEdgeClusterSettings.raft_messages_log_enable ) )
{
File logsDir = config.get( GraphDatabaseSettings.logs_directory );
messageLogger = life.add( new BetterMessageLogger<>( myself, raftMessagesLog( logsDir ) ) );
}
else
{
messageLogger = new NullMessageLogger<>();
}
return messageLogger;
}

private void editionInvariants( PlatformModule platformModule, Dependencies dependencies, Config config,
LogService logging, LifeSupport life )
{
Expand Down
Expand Up @@ -20,10 +20,7 @@
package org.neo4j.coreedge.core.consensus;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.EnterpriseCoreEditionModule;
Expand All @@ -47,17 +44,8 @@
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.logging.BetterMessageLogger;
import org.neo4j.coreedge.logging.MessageLogger;
import org.neo4j.coreedge.logging.NullMessageLogger;
import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.messaging.LoggingOutbound;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.coreedge.messaging.RaftChannelInitializer;
import org.neo4j.coreedge.messaging.RaftOutbound;
import org.neo4j.coreedge.messaging.SenderService;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
Expand All @@ -81,7 +69,7 @@ public class ConsensusModule
private final RaftMembershipManager raftMembershipManager;
private final InFlightMap<Long,RaftLogEntry> inFlightMap = new InFlightMap<>();

public ConsensusModule( MemberId myself, final PlatformModule platformModule, RaftOutbound raftOutbound,
public ConsensusModule( MemberId myself, final PlatformModule platformModule, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
File clusterStateDirectory, CoreTopologyService discoveryService )
{
final Config config = platformModule.config;
Expand All @@ -92,30 +80,11 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ra
LogProvider logProvider = logging.getInternalLogProvider();

final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal();
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );
final SenderService senderService =
new SenderService( new RaftChannelInitializer( marshal, logProvider ), logProvider, platformModule.monitors,
maxQueueSize, new NonBlockingChannels() );
life.add( senderService );

final MessageLogger<MemberId> messageLogger;
if ( config.get( CoreEdgeClusterSettings.raft_messages_log_enable ) )
{
File logsDir = config.get( GraphDatabaseSettings.logs_directory );
messageLogger = life.add( new BetterMessageLogger<>( myself, raftMessagesLog( logsDir ) ) );
}
else
{
messageLogger = new NullMessageLogger<>();
}

RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider, platformModule.jobScheduler );

raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors );

Outbound<MemberId,RaftMessages.RaftMessage> loggingOutbound = new LoggingOutbound<>(
raftOutbound, myself, messageLogger );

StateStorage<TermState> termState;
StateStorage<VoteState> voteState;
StateStorage<RaftMembershipState> raftMembershipStorage;
Expand Down Expand Up @@ -150,8 +119,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ra

MemberIdSetBuilder memberSetBuilder = new MemberIdSetBuilder();

SendToMyself leaderOnlyReplicator =
new SendToMyself( myself, loggingOutbound );
SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound );

raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout, systemUTC(),
Expand All @@ -161,7 +129,7 @@ expectedClusterSize, electionTimeout, systemUTC(),
life.add( raftMembershipManager );

RaftLogShippingManager logShipping =
new RaftLogShippingManager( loggingOutbound, logProvider, raftLog, systemUTC(),
new RaftLogShippingManager( outbound, logProvider, raftLog, systemUTC(),
myself, raftMembershipManager, electionTimeout,
config.get( CoreEdgeClusterSettings.catchup_batch_size ),
config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap );
Expand All @@ -170,7 +138,7 @@ expectedClusterSize, electionTimeout, systemUTC(),

raftMachine =
new RaftMachine( myself, termState, voteState, raftLog, electionTimeout,
heartbeatInterval, raftTimeoutService, loggingOutbound, logProvider, raftMembershipManager,
heartbeatInterval, raftTimeoutService, outbound, logProvider, raftMembershipManager,
logShipping, inFlightMap, platformModule.monitors );

life.add( new RaftDiscoveryServiceConnector( discoveryService, raftMachine ) );
Expand Down Expand Up @@ -211,21 +179,6 @@ private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstra
}
}

private static PrintWriter raftMessagesLog( File logsDir )
{
//noinspection ResultOfMethodCallIgnored
logsDir.mkdirs();
try
{

return new PrintWriter( new FileOutputStream( new File( logsDir, "raft-messages.log" ), true ) );
}
catch ( FileNotFoundException e )
{
throw new RuntimeException( e );
}
}

public RaftLog raftLog()
{
return raftLog;
Expand Down
Expand Up @@ -126,7 +126,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C

CoreToCoreClient coreToCoreClient = life.add(
new CoreToCoreClient( logProvider, channelInitializer, platformModule.monitors, maxQueueSize,
nonBlockingChannels, discoveryService, logThresholdMillis ) );
discoveryService, logThresholdMillis ) );
channelInitializer.setOwner( coreToCoreClient );

StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache,
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.coreedge.discovery;


import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
Expand Down
Expand Up @@ -37,7 +37,6 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
Expand All @@ -53,8 +52,7 @@

class HazelcastClusterTopology
{
// hz client uuid string -> boltAddress string
static final String EDGE_SERVER_BOLT_ADDRESS_MAP_NAME = "edge-servers";
static final String EDGE_SERVER_BOLT_ADDRESS_MAP_NAME = "edge-servers"; // hz client uuid string -> boltAddress string
static final String MEMBER_UUID = "member_uuid";
static final String TRANSACTION_SERVER = "transaction_server";
static final String RAFT_SERVER = "raft_server";
Expand All @@ -71,7 +69,7 @@ static ClusterTopology getClusterTopology( HazelcastInstance hazelcastInstance,
edgeMembers( hazelcastInstance, log ) );
}

static class GetConnectedClients implements Callable<Collection<String>>, Serializable, HazelcastInstanceAware
private static class GetConnectedClients implements Callable<Collection<String>>, Serializable, HazelcastInstanceAware
{
private transient HazelcastInstance instance;

Expand Down
Expand Up @@ -32,8 +32,7 @@ public class RaftDiscoveryServiceConnector extends LifecycleAdapter implements C
private final CoreTopologyService discoveryService;
private final RaftMachine raftMachine;

public RaftDiscoveryServiceConnector( CoreTopologyService discoveryService,
RaftMachine raftMachine )
public RaftDiscoveryServiceConnector( CoreTopologyService discoveryService, RaftMachine raftMachine )
{
this.discoveryService = discoveryService;
this.raftMachine = raftMachine;
Expand Down

0 comments on commit 3334435

Please sign in to comment.