Skip to content

Commit

Permalink
core-edge: introduce a few more modules
Browse files Browse the repository at this point in the history
Identity is now encapsulated behind the IdentityModule.
Discovery is now encapsulated behind the ClusteringModule.

MemberIdStorage now works slightly different as well and the
simple functionality is now even further simplified and extracted
to SimpleStorage.
  • Loading branch information
martinfurmanski committed Aug 18, 2016
1 parent 1d922f3 commit 736fa22
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 88 deletions.
Expand Up @@ -35,8 +35,8 @@
import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.core.server.CoreServerModule;
import org.neo4j.coreedge.core.state.ClusteringModule;
import org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule;
import org.neo4j.coreedge.core.state.storage.MemberIdStorage;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.procedures.AcquireEndpointsProcedure;
Expand Down Expand Up @@ -95,10 +95,9 @@
public class EnterpriseCoreEditionModule extends EditionModule
{
public static final String CLUSTER_STATE_DIRECTORY_NAME = "cluster-state";
public static final String CORE_MEMBER_ID_NAME = "core-member-id";

private final ConsensusModule consensusModule;
private final CoreTopologyService discoveryService;
private final CoreTopologyService topologyService;
private final LogProvider logProvider;

public enum RaftLogImplementation
Expand All @@ -111,9 +110,9 @@ public void registerProcedures( Procedures procedures )
{
try
{
procedures.register( new DiscoverEndpointAcquisitionServersProcedure( discoveryService, logProvider ) );
procedures.register( new AcquireEndpointsProcedure( discoveryService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new ClusterOverviewProcedure( discoveryService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new DiscoverEndpointAcquisitionServersProcedure( topologyService, logProvider ) );
procedures.register( new AcquireEndpointsProcedure( topologyService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new ClusterOverviewProcedure( topologyService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new CoreRoleProcedure( consensusModule.raftMachine()) );
}
catch ( ProcedureException e )
Expand Down Expand Up @@ -147,20 +146,10 @@ public void registerProcedures( Procedures procedures )

life.add( localDatabase );

MemberIdStorage memberIdStorage = new MemberIdStorage( fileSystem, clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberId.Marshal(), logProvider );
MemberId myself;
try
{
myself = memberIdStorage.readState();
}
catch ( IOException e )
{
throw new RuntimeException( e );
}

discoveryService = discoveryServiceFactory.coreDiscoveryService( config, myself, logProvider );
IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory );

life.add( dependencies.satisfyDependency( discoveryService ) );
ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(), platformModule );
topologyService = clusteringModule.topologyService();

long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle );
int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size );
Expand All @@ -170,20 +159,20 @@ public void registerProcedures( Procedures procedures )
logProvider, platformModule.monitors, maxQueueSize );
life.add( raftSender );

final MessageLogger<MemberId> messageLogger = createMessageLogger( config, life, myself );
final MessageLogger<MemberId> messageLogger = createMessageLogger( config, life, identityModule.myself() );

Outbound<MemberId,RaftMessages.RaftMessage> raftOutbound = new LoggingOutbound<>(
new RaftOutbound( discoveryService, raftSender, localDatabase, logProvider, logThresholdMillis ),
myself, messageLogger );
new RaftOutbound( topologyService, raftSender, localDatabase, logProvider, logThresholdMillis ),
identityModule.myself(), messageLogger );

consensusModule = new ConsensusModule( myself, platformModule, raftOutbound, clusterStateDirectory, discoveryService );
consensusModule = new ConsensusModule( identityModule.myself(), platformModule, raftOutbound, clusterStateDirectory, topologyService );

dependencies.satisfyDependency( consensusModule.raftMachine() );

ReplicationModule replicationModule = new ReplicationModule( myself, platformModule, config, consensusModule,
ReplicationModule replicationModule = new ReplicationModule( identityModule.myself(), platformModule, config, consensusModule,
raftOutbound, clusterStateDirectory, fileSystem, logProvider );

CoreStateMachinesModule coreStateMachinesModule = new CoreStateMachinesModule( myself, platformModule, clusterStateDirectory, config,
CoreStateMachinesModule coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(), platformModule, clusterStateDirectory, config,
replicationModule.getReplicator(), consensusModule.raftMachine(), dependencies, localDatabase );

this.idGeneratorFactory = coreStateMachinesModule.idGeneratorFactory;
Expand All @@ -194,8 +183,8 @@ public void registerProcedures( Procedures procedures )
this.lockManager = coreStateMachinesModule.lockManager;
this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory;

CoreServerModule coreServerModule = new CoreServerModule( myself, platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory, discoveryService, localDatabase,
CoreServerModule coreServerModule = new CoreServerModule( identityModule.myself(), platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory, topologyService, localDatabase,
messageLogger );

editionInvariants( platformModule, dependencies, config, logging, life );
Expand Down
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.core;

import java.io.File;
import java.io.IOException;
import java.util.UUID;

import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class IdentityModule
{
public static final String CORE_MEMBER_ID_NAME = "core-member-id";
private MemberId myself;

IdentityModule( PlatformModule platformModule, File clusterStateDirectory )
{
FileSystemAbstraction fileSystem = platformModule.fileSystem;
LogProvider logProvider = platformModule.logging.getInternalLogProvider();

Log log = logProvider.getLog( getClass() );

SimpleStorage<MemberId> memberIdStorage = new SimpleStorage<>( fileSystem, clusterStateDirectory,
CORE_MEMBER_ID_NAME, new MemberId.Marshal(), logProvider );

try
{
if ( memberIdStorage.exists() )
{
myself = memberIdStorage.readState();
if ( myself == null )
{
throw new RuntimeException( "I was null" );
}
}
else
{
UUID uuid = UUID.randomUUID();
myself = new MemberId( uuid );
memberIdStorage.writeState( myself );

log.info( String.format( "Generated new id: %s (%s)", myself, uuid ) );
}
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

MemberId myself()
{
return myself;
}
}
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.core.state;

import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;

public class ClusteringModule
{
private final CoreTopologyService topologyService;

public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, MemberId myself, PlatformModule platformModule )
{
LifeSupport life = platformModule.life;
Config config = platformModule.config;
LogProvider logProvider = platformModule.logging.getInternalLogProvider();
Dependencies dependencies = platformModule.dependencies;

topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider );
life.add( topologyService );

dependencies.satisfyDependency( topologyService ); // for tests
}

public CoreTopologyService topologyService()
{
return topologyService;
}
}
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.coreedge.core.state.machines.id.IdAllocationState;
import org.neo4j.coreedge.core.state.machines.locks.ReplicatedLockTokenState;
import org.neo4j.coreedge.core.state.storage.DurableStateStorage;
import org.neo4j.coreedge.core.state.storage.MemberIdStorage;
import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.core.state.storage.StateMarshal;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.MemberId.Marshal;
Expand All @@ -42,7 +42,7 @@
import static org.neo4j.coreedge.ReplicationModule.LAST_FLUSHED_NAME;
import static org.neo4j.coreedge.ReplicationModule.SESSION_TRACKER_NAME;
import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME;
import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CORE_MEMBER_ID_NAME;
import static org.neo4j.coreedge.core.IdentityModule.CORE_MEMBER_ID_NAME;
import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_MEMBERSHIP_NAME;
import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_TERM_NAME;
import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_VOTE_NAME;
Expand Down Expand Up @@ -84,7 +84,7 @@ public static void main( String[] args ) throws IOException

void dump() throws IOException
{
MemberIdStorage memberIdStorage = new MemberIdStorage( fs, clusterStateDirectory, CORE_MEMBER_ID_NAME,
SimpleStorage<MemberId> memberIdStorage = new SimpleStorage<>( fs, clusterStateDirectory, CORE_MEMBER_ID_NAME,
new Marshal(), NullLogProvider.getInstance() );
if ( memberIdStorage.exists() )
{
Expand Down
Expand Up @@ -21,10 +21,9 @@

import java.io.File;
import java.io.IOException;
import java.util.UUID;

import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.EndOfStreamException;
import org.neo4j.coreedge.messaging.marshalling.ChannelMarshal;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel;
Expand All @@ -33,15 +32,15 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class MemberIdStorage
public class SimpleStorage<T>
{
private final FileSystemAbstraction fileSystem;
private final MemberId.Marshal marshal;
private final ChannelMarshal<T> marshal;
private final File file;
private Log log;

public MemberIdStorage( FileSystemAbstraction fileSystem, File directory, String name,
MemberId.Marshal marshal, LogProvider logProvider )
public SimpleStorage( FileSystemAbstraction fileSystem, File directory, String name,
ChannelMarshal<T> marshal, LogProvider logProvider )
{
this.fileSystem = fileSystem;
this.log = logProvider.getLog( getClass() );
Expand All @@ -54,38 +53,27 @@ public boolean exists()
return fileSystem.fileExists( file );
}

public MemberId readState() throws IOException
public T readState() throws IOException
{
if ( exists() )
try ( ReadableClosableChannel channel = new ReadAheadChannel<>( fileSystem.open( file, "r" ) ) )
{
try ( ReadableClosableChannel channel = new ReadAheadChannel<>( fileSystem.open( file, "r" ) ) )
{
MemberId memberId = marshal.unmarshal( channel );
if ( memberId != null )
{
return memberId;
}
}
catch ( EndOfStreamException e )
{
log.error( "End of stream reached" );
}
return marshal.unmarshal( channel );
}
else
catch ( EndOfStreamException e )
{
log.warn( "File does not exist" );
log.error( "End of stream reached: " + file );
throw new IOException( e );
}
}

public void writeState( T state ) throws IOException
{
fileSystem.mkdirs( file.getParentFile() );
fileSystem.deleteFile( file );

UUID uuid = UUID.randomUUID();
MemberId memberId = new MemberId( uuid );
log.info( String.format( "Generated new id: %s (%s)", memberId, uuid ) );
try ( FlushableChannel channel = new PhysicalFlushableChannel( fileSystem.create( file ) ) )
{
marshal.marshal( memberId, channel );
marshal.marshal( state, channel );
}
return memberId;
}
}
Expand Up @@ -27,7 +27,7 @@

public interface DiscoveryServiceFactory
{
CoreTopologyService coreDiscoveryService( Config config, MemberId myself, LogProvider logProvider );
CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider );

TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate );
}
Expand Up @@ -29,7 +29,7 @@
public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory
{
@Override
public CoreTopologyService coreDiscoveryService( Config config, MemberId myself, LogProvider logProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider )
{
makeHazelcastSilent( config );
hazelcastShouldNotPhoneHome();
Expand Down
Expand Up @@ -81,7 +81,7 @@ public int hashCode()
* │leastSignificantBits 8 bytes│
* └──────────────────────────────┘
*/
public static class MemberIdMarshal extends SafeStateMarshal<MemberId>
public static class Marshal extends SafeStateMarshal<MemberId>
{
@Override
public void marshal( MemberId memberId, WritableChannel channel ) throws IOException
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.UUID;

import org.neo4j.coreedge.core.consensus.membership.RaftMembershipState;
import org.neo4j.coreedge.core.consensus.term.TermState;
Expand All @@ -34,7 +35,7 @@
import org.neo4j.coreedge.core.state.machines.id.IdAllocationState;
import org.neo4j.coreedge.core.state.machines.locks.ReplicatedLockTokenState;
import org.neo4j.coreedge.core.state.storage.DurableStateStorage;
import org.neo4j.coreedge.core.state.storage.MemberIdStorage;
import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.core.state.storage.StateMarshal;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.kernel.lifecycle.Lifespan;
Expand All @@ -44,7 +45,7 @@
import static org.junit.Assert.assertEquals;
import static org.neo4j.coreedge.ReplicationModule.LAST_FLUSHED_NAME;
import static org.neo4j.coreedge.ReplicationModule.SESSION_TRACKER_NAME;
import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CORE_MEMBER_ID_NAME;
import static org.neo4j.coreedge.core.IdentityModule.CORE_MEMBER_ID_NAME;
import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_MEMBERSHIP_NAME;
import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_TERM_NAME;
import static org.neo4j.coreedge.core.consensus.ConsensusModule.RAFT_VOTE_NAME;
Expand Down Expand Up @@ -75,8 +76,8 @@ public void shouldDumpClusterState() throws Exception

private void createStates() throws IOException
{
MemberIdStorage memberIdStorage = new MemberIdStorage( fsa.get(), clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberId.Marshal(), NullLogProvider.getInstance() );
memberIdStorage.readState();
SimpleStorage<MemberId> memberIdStorage = new SimpleStorage<>( fsa.get(), clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberId.Marshal(), NullLogProvider.getInstance() );
memberIdStorage.writeState( new MemberId( UUID.randomUUID() ) );

createDurableState( LAST_FLUSHED_NAME, new LongIndexMarshal() );
createDurableState( LOCK_TOKEN_NAME, new ReplicatedLockTokenState.Marshal( new MemberId.Marshal() ) );
Expand Down

0 comments on commit 736fa22

Please sign in to comment.