Skip to content

Commit

Permalink
Filter messages by clusterId.
Browse files Browse the repository at this point in the history
Avoids processing of rogue messages from another cluster.
Replaces previous implementation based on storeId.

Also rename BindingService to ClusterIdentity, owned by
ClusteringModule.
  • Loading branch information
apcj committed Sep 17, 2016
1 parent 615656d commit fa2ab5e
Show file tree
Hide file tree
Showing 19 changed files with 214 additions and 167 deletions.
Expand Up @@ -151,7 +151,7 @@ protected Log authManagerLog()
IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory );

ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(),
platformModule );
platformModule, clusterStateDirectory );
topologyService = clusteringModule.topologyService();

long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle );
Expand All @@ -164,16 +164,18 @@ protected Log authManagerLog()

final MessageLogger<MemberId> messageLogger = createMessageLogger( config, life, identityModule.myself() );

Outbound<MemberId,RaftMessages.RaftMessage> raftOutbound = new LoggingOutbound<>(
new RaftOutbound( topologyService, raftSender, localDatabase, logProvider, logThresholdMillis ),
RaftOutbound raftOutbound = new RaftOutbound( topologyService, raftSender, clusteringModule.clusterIdentity(),
logProvider, logThresholdMillis );
Outbound<MemberId,RaftMessages.RaftMessage> loggingOutbound = new LoggingOutbound<>( raftOutbound,
identityModule.myself(), messageLogger );

consensusModule = new ConsensusModule( identityModule.myself(), platformModule, raftOutbound, clusterStateDirectory, topologyService );
consensusModule = new ConsensusModule( identityModule.myself(), platformModule, loggingOutbound,
clusterStateDirectory, topologyService );

dependencies.satisfyDependency( consensusModule.raftMachine() );

ReplicationModule replicationModule = new ReplicationModule( identityModule.myself(), platformModule, config, consensusModule,
raftOutbound, clusterStateDirectory, fileSystem, logProvider );
loggingOutbound, clusterStateDirectory, fileSystem, logProvider );

CoreStateMachinesModule coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(), platformModule, clusterStateDirectory, config,
replicationModule.getReplicator(), consensusModule.raftMachine(), dependencies, localDatabase );
Expand All @@ -187,8 +189,8 @@ protected Log authManagerLog()
this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory;
this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() );

CoreServerModule coreServerModule = new CoreServerModule( identityModule.myself(), platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory, topologyService, localDatabase,
CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory, clusteringModule, localDatabase,
messageLogger, databaseHealthSupplier );

editionInvariants( platformModule, dependencies, config, logging, life );
Expand Down
Expand Up @@ -34,6 +34,7 @@
public class IdentityModule
{
public static final String CORE_MEMBER_ID_NAME = "core-member-id";

private MemberId myself;

IdentityModule( PlatformModule platformModule, File clusterStateDirectory )
Expand Down Expand Up @@ -71,7 +72,7 @@ public class IdentityModule
}
}

MemberId myself()
public MemberId myself()
{
return myself;
}
Expand Down
Expand Up @@ -25,11 +25,11 @@
import java.util.List;
import java.util.Objects;

import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.core.consensus.log.RaftLogEntry;
import org.neo4j.coreedge.core.replication.ReplicatedContent;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.StoreId;

import static java.lang.String.format;

Expand Down Expand Up @@ -633,21 +633,21 @@ public List<ReplicatedContent> contents()
}
}

class StoreIdAwareMessage implements Message
class ClusterIdAwareMessage implements Message
{
private final StoreId storeId;
private final ClusterId clusterId;
private final RaftMessage message;

public StoreIdAwareMessage( StoreId storeId, RaftMessage message )
public ClusterIdAwareMessage( ClusterId clusterId, RaftMessage message )
{
Objects.requireNonNull( message );
this.storeId = storeId;
this.clusterId = clusterId;
this.message = message;
}

public StoreId storeId()
public ClusterId clusterId()
{
return storeId;
return clusterId;
}

public RaftMessage message()
Expand All @@ -666,20 +666,20 @@ public boolean equals( Object o )
{
return false;
}
StoreIdAwareMessage that = (StoreIdAwareMessage) o;
return Objects.equals( storeId, that.storeId ) && Objects.equals( message, that.message );
ClusterIdAwareMessage that = (ClusterIdAwareMessage) o;
return Objects.equals( clusterId, that.clusterId ) && Objects.equals( message, that.message );
}

@Override
public int hashCode()
{
return Objects.hash( storeId, message );
return Objects.hash( clusterId, message );
}

@Override
public String toString()
{
return format( "{storeId: %s, message: %s}", storeId, message );
return format( "{clusterId: %s, message: %s}", clusterId, message );
}

}
Expand Down
Expand Up @@ -38,7 +38,6 @@

import org.neo4j.coreedge.VersionDecoder;
import org.neo4j.coreedge.VersionPrepender;
import org.neo4j.coreedge.catchup.CatchupServer;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.replication.ReplicatedContent;
import org.neo4j.coreedge.handlers.ExceptionLoggingHandler;
Expand All @@ -58,7 +57,7 @@

import static java.lang.String.format;

public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.StoreIdAwareMessage>
public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.ClusterIdAwareMessage>
{
private static final Setting<ListenSocketAddress> setting = CoreEdgeClusterSettings.raft_listen_address;
private final ChannelMarshal<ReplicatedContent> marshal;
Expand All @@ -68,7 +67,7 @@ public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages
private final Log userLog;
private final Monitors monitors;

private MessageHandler<RaftMessages.StoreIdAwareMessage> messageHandler;
private MessageHandler<RaftMessages.ClusterIdAwareMessage> messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;

Expand Down Expand Up @@ -163,24 +162,24 @@ protected void initChannel( SocketChannel ch ) throws Exception
}

@Override
public void registerHandler( Inbound.MessageHandler<RaftMessages.StoreIdAwareMessage> handler )
public void registerHandler( Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> handler )
{
this.messageHandler = handler;
}

private class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.StoreIdAwareMessage>
private class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.ClusterIdAwareMessage>
{
@Override
protected void channelRead0( ChannelHandlerContext channelHandlerContext,
RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) throws Exception
RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage ) throws Exception
{
try
{
messageHandler.handle( storeIdAwareMessage );
messageHandler.handle( clusterIdAwareMessage );
}
catch ( Exception e )
{
log.error( format( "Failed to process message %s", storeIdAwareMessage ), e );
log.error( format( "Failed to process message %s", clusterIdAwareMessage ), e );
}
}
}
Expand Down
Expand Up @@ -26,24 +26,24 @@

import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.messaging.Inbound.MessageHandler;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;

public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessages.StoreIdAwareMessage>
public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessages.ClusterIdAwareMessage>
{
private final Log log;
private final BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue;
private final BlockingQueue<RaftMessages.ClusterIdAwareMessage> messageQueue;

private final int maxBatch;
private final List<RaftMessages.StoreIdAwareMessage> batch;
private final List<RaftMessages.ClusterIdAwareMessage> batch;

private MessageHandler<RaftMessages.StoreIdAwareMessage> handler;
private MessageHandler<RaftMessages.ClusterIdAwareMessage> handler;

public BatchingMessageHandler( MessageHandler<RaftMessages.StoreIdAwareMessage> handler,
public BatchingMessageHandler( MessageHandler<RaftMessages.ClusterIdAwareMessage> handler,
int queueSize, int maxBatch, LogProvider logProvider )
{
this.handler = handler;
Expand All @@ -55,7 +55,7 @@ public BatchingMessageHandler( MessageHandler<RaftMessages.StoreIdAwareMessage>
}

@Override
public void handle( RaftMessages.StoreIdAwareMessage message )
public void handle( RaftMessages.ClusterIdAwareMessage message )
{
try
{
Expand All @@ -70,7 +70,7 @@ public void handle( RaftMessages.StoreIdAwareMessage message )
@Override
public void run()
{
RaftMessages.StoreIdAwareMessage message = null;
RaftMessages.ClusterIdAwareMessage message = null;
try
{
message = messageQueue.poll( 1, SECONDS );
Expand All @@ -96,32 +96,32 @@ public void run()
}
}

private void drain( BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue,
List<RaftMessages.StoreIdAwareMessage> batch, int maxElements )
private void drain( BlockingQueue<RaftMessages.ClusterIdAwareMessage> messageQueue,
List<RaftMessages.ClusterIdAwareMessage> batch, int maxElements )
{
List<RaftMessages.StoreIdAwareMessage> tempDraining = new ArrayList<>();
List<RaftMessages.ClusterIdAwareMessage> tempDraining = new ArrayList<>();
messageQueue.drainTo( tempDraining, maxElements );

for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : tempDraining )
for ( RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage : tempDraining )
{
batch.add( storeIdAwareMessage );
batch.add( clusterIdAwareMessage );
}
}

private void collateAndHandleBatch( List<RaftMessages.StoreIdAwareMessage> batch )
private void collateAndHandleBatch( List<RaftMessages.ClusterIdAwareMessage> batch )
{
RaftMessages.NewEntry.BatchRequest batchRequest = null;
StoreId storeId = batch.get( 0 ).storeId();
ClusterId clusterId = batch.get( 0 ).clusterId();

for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : batch )
for ( RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage : batch )
{
if ( batchRequest != null && !storeIdAwareMessage.storeId().equals( storeId ))
if ( batchRequest != null && !clusterIdAwareMessage.clusterId().equals( clusterId ))
{
handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) );
handler.handle( new RaftMessages.ClusterIdAwareMessage( clusterId, batchRequest ) );
batchRequest = null;
}
storeId = storeIdAwareMessage.storeId();
RaftMessage message = storeIdAwareMessage.message();
clusterId = clusterIdAwareMessage.clusterId();
RaftMessage message = clusterIdAwareMessage.message();
if ( message instanceof RaftMessages.NewEntry.Request )
{
RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry.Request) message;
Expand All @@ -134,13 +134,13 @@ private void collateAndHandleBatch( List<RaftMessages.StoreIdAwareMessage> batch
}
else
{
handler.handle( storeIdAwareMessage );
handler.handle( clusterIdAwareMessage );
}
}

if ( batchRequest != null )
{
handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) );
handler.handle( new RaftMessages.ClusterIdAwareMessage( clusterId, batchRequest ) );
}
}
}
Expand Up @@ -34,27 +34,23 @@
import org.neo4j.coreedge.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.coreedge.catchup.tx.TxPullClient;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.IdentityModule;
import org.neo4j.coreedge.core.consensus.ConsensusModule;
import org.neo4j.coreedge.core.consensus.ContinuousJob;
import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.core.consensus.RaftServer;
import org.neo4j.coreedge.core.consensus.log.pruning.PruningScheduler;
import org.neo4j.coreedge.core.consensus.membership.MembershipWaiter;
import org.neo4j.coreedge.core.consensus.membership.MembershipWaiterLifecycle;
import org.neo4j.coreedge.core.state.BindingService;
import org.neo4j.coreedge.core.state.ClusteringModule;
import org.neo4j.coreedge.core.state.CommandApplicationProcess;
import org.neo4j.coreedge.core.state.CoreBootstrapper;
import org.neo4j.coreedge.core.state.CoreState;
import org.neo4j.coreedge.core.state.CoreStateApplier;
import org.neo4j.coreedge.core.state.LongIndexMarshal;
import org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule;
import org.neo4j.coreedge.core.state.snapshot.CoreStateDownloader;
import org.neo4j.coreedge.core.state.storage.DurableStateStorage;
import org.neo4j.coreedge.core.state.storage.SimpleFileStorage;
import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.core.state.storage.StateStorage;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.logging.MessageLogger;
import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal;
Expand All @@ -73,21 +69,19 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.time.Clocks;

import static java.lang.Thread.sleep;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD;

public class CoreServerModule
{
private static final String CLUSTER_ID_NAME = "cluster-id";
public static final String CLUSTER_ID_NAME = "cluster-id";
public static final String LAST_FLUSHED_NAME = "last-flushed";

public final MembershipWaiterLifecycle membershipWaiterLifecycle;

public CoreServerModule( MemberId myself, final PlatformModule platformModule, ConsensusModule consensusModule,
CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule,
File clusterStateDirectory, CoreTopologyService discoveryService,
LocalDatabase localDatabase, MessageLogger<MemberId> messageLogger, Supplier<DatabaseHealth> dbHealthSupplier )
public CoreServerModule( IdentityModule identityModule, final PlatformModule platformModule, ConsensusModule consensusModule,
CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule,
File clusterStateDirectory, ClusteringModule clusteringModule,
LocalDatabase localDatabase, MessageLogger<MemberId> messageLogger, Supplier<DatabaseHealth> dbHealthSupplier )
{
final Dependencies dependencies = platformModule.dependencies;
final Config config = platformModule.config;
Expand All @@ -113,11 +107,11 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
RaftServer raftServer =
new RaftServer( new CoreReplicatedContentMarshal(), config, logProvider, userLogProvider, monitors );

LoggingInbound<RaftMessages.StoreIdAwareMessage> loggingRaftInbound =
new LoggingInbound<>( raftServer, messageLogger, myself );
LoggingInbound<RaftMessages.ClusterIdAwareMessage> loggingRaftInbound =
new LoggingInbound<>( raftServer, messageLogger, identityModule.myself() );

CatchUpClient catchUpClient =
life.add( new CatchUpClient( discoveryService, logProvider, Clocks.systemClock(), monitors ) );
CatchUpClient catchUpClient = life.add( new CatchUpClient( clusteringModule.topologyService(), logProvider,
Clocks.systemClock(), monitors ) );

StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache,
new StoreCopyClient( catchUpClient ), new TxPullClient( catchUpClient, platformModule.monitors ),
Expand All @@ -130,20 +124,10 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher,
catchUpClient, logProvider, copiedStoreRecovery );

SimpleStorage<ClusterId> clusterIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory,
CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider );

CoreBootstrapper coreBootstrapper = new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache,
fileSystem, config );

BindingService bindingService = new BindingService( clusterIdStorage, discoveryService, logProvider,
Clocks.systemClock(), () -> sleep( 100 ), 300_000, coreBootstrapper );

CoreState coreState = new CoreState(
consensusModule.raftMachine(), localDatabase,
consensusModule.raftMachine(), localDatabase, clusteringModule.clusterIdentity(),
logProvider,
downloader,
bindingService,
new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(),
config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
Expand All @@ -165,7 +149,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );

MembershipWaiter membershipWaiter =
new MembershipWaiter( myself, platformModule.jobScheduler, dbHealthSupplier,
new MembershipWaiter( identityModule.myself(), platformModule.jobScheduler, dbHealthSupplier,
electionTimeout * 4, logProvider );
long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
Expand Down

0 comments on commit fa2ab5e

Please sign in to comment.