Skip to content

Commit

Permalink
Addressing Multiclustering reviews and general cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Mar 11, 2018
1 parent 872c458 commit a1ee8ed
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 51 deletions.
Expand Up @@ -112,10 +112,10 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<Integer> expected_core_cluster_size = public static final Setting<Integer> expected_core_cluster_size =
setting( "causal_clustering.expected_core_cluster_size", INTEGER, "3" ); setting( "causal_clustering.expected_core_cluster_size", INTEGER, "3" );


//TODO: Document that when using multi-clustering this size refers to the size of the sub-cluster which shares this instances database name
@Description( "Minimum number of Core machines in the cluster at formation. The expected_core_cluster size setting is used when bootstrapping the " + @Description( "Minimum number of Core machines in the cluster at formation. The expected_core_cluster size setting is used when bootstrapping the " +
"cluster on first formation. A cluster will not form without the configured amount of cores and this should in general be configured to the" + "cluster on first formation. A cluster will not form without the configured amount of cores and this should in general be configured to the" +
" full and fixed amount." ) " full and fixed amount. When using multi-clustering (configuring multiple distinct database names across core hosts), this setting is used " +
"to define the minimum size of *each* sub-cluster at formation." )
public static final Setting<Integer> minimum_core_cluster_size_at_formation = public static final Setting<Integer> minimum_core_cluster_size_at_formation =
buildSetting( "causal_clustering.minimum_core_cluster_size_at_formation", INTEGER, expected_core_cluster_size.getDefaultValue() ) buildSetting( "causal_clustering.minimum_core_cluster_size_at_formation", INTEGER, expected_core_cluster_size.getDefaultValue() )
.constraint( min( 2 ) ).build(); .constraint( min( 2 ) ).build();
Expand Down
Expand Up @@ -23,9 +23,11 @@


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


// TODO: basics for Serializable
public class LeaderInfo implements Serializable public class LeaderInfo implements Serializable
{ {

private static final long serialVersionUID = 7983780359510842910L;

public static final LeaderInfo INITIAL = new LeaderInfo( null, -1 ); public static final LeaderInfo INITIAL = new LeaderInfo( null, -1 );


private final MemberId memberId; private final MemberId memberId;
Expand Down
Expand Up @@ -29,9 +29,9 @@
*/ */
public interface CoreTopologyService extends TopologyService public interface CoreTopologyService extends TopologyService
{ {
void addCoreTopologyListener( Listener listener ); void addLocalCoreTopologyListener( Listener listener );


void removeCoreTopologyListener( Listener listener ); void removeLocalCoreTopologyListener( Listener listener );


/** /**
* Publishes the cluster ID so that other members might discover it. * Publishes the cluster ID so that other members might discover it.
Expand Down
Expand Up @@ -63,10 +63,12 @@ public class HazelcastClient extends AbstractTopologyService


private JobScheduler.JobHandle keepAliveJob; private JobScheduler.JobHandle keepAliveJob;
private JobScheduler.JobHandle refreshTopologyJob; private JobScheduler.JobHandle refreshTopologyJob;
private JobScheduler.JobHandle refreshRolesJob;


private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>(); private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile CoreTopology coreTopology = CoreTopology.EMPTY; private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY; private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY;
private volatile Map<MemberId,RoleInfo> coreRoles = emptyMap();


public HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, MemberId myself, public HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, MemberId myself,
TopologyServiceRetryStrategy topologyServiceRetryStrategy ) TopologyServiceRetryStrategy topologyServiceRetryStrategy )
Expand All @@ -83,6 +85,7 @@ public HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, Lo
this.groups = config.get( CausalClusteringSettings.server_groups ); this.groups = config.get( CausalClusteringSettings.server_groups );
this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod, logProvider ); this.topologyServiceRetryStrategy = resolveStrategy( refreshPeriod, logProvider );
this.dbName = config.get( CausalClusteringSettings.database ); this.dbName = config.get( CausalClusteringSettings.database );
this.coreRoles = emptyMap();
} }


private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis, LogProvider logProvider ) private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodMillis, LogProvider logProvider )
Expand All @@ -96,17 +99,7 @@ private static TopologyServiceRetryStrategy resolveStrategy( long refreshPeriodM
@Override @Override
public Map<MemberId,RoleInfo> allCoreRoles() public Map<MemberId,RoleInfo> allCoreRoles()
{ {
Map<MemberId,RoleInfo> roles = emptyMap(); return coreRoles;

try
{
roles = hzInstance.apply(hz -> HazelcastClusterTopology.getCoreRoles( hz, allCoreServers().members().keySet() ) );
}
catch ( HazelcastInstanceNotActiveException e )
{
log.warn( "Tried to fetch the server roles, but hazelcast was not available.", e);
}
return roles;
} }


@Override @Override
Expand Down Expand Up @@ -148,11 +141,19 @@ private void refreshTopology() throws HazelcastInstanceNotActiveException
catchupAddressMap = extractCatchupAddressesMap( localCoreServers(), localReadReplicas() ); catchupAddressMap = extractCatchupAddressesMap( localCoreServers(), localReadReplicas() );
} }


private void refreshRoles() throws HazelcastInstanceNotActiveException
{
coreRoles = hzInstance.apply(hz -> HazelcastClusterTopology.getCoreRoles( hz, allCoreServers().members().keySet() ) );
}

@Override @Override
public void start() public void start()
{ {
keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive ); keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive );
refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, this::refreshTopology ); refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, () -> {
this.refreshTopology();
this.refreshRoles();
} );
} }


@Override @Override
Expand Down
Expand Up @@ -54,7 +54,7 @@
import static org.neo4j.helpers.SocketAddressParser.socketAddress; import static org.neo4j.helpers.SocketAddressParser.socketAddress;
import static org.neo4j.helpers.collection.Iterables.asSet; import static org.neo4j.helpers.collection.Iterables.asSet;


public class HazelcastClusterTopology public final class HazelcastClusterTopology
{ {
// per server attributes // per server attributes
private static final String DISCOVERY_SERVER = "discovery_server"; // not currently used private static final String DISCOVERY_SERVER = "discovery_server"; // not currently used
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand All @@ -33,6 +34,7 @@
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastException; import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent; import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent; import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener; import com.hazelcast.core.MembershipListener;
Expand Down Expand Up @@ -88,11 +90,12 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem
private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY; private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY;
private volatile CoreTopology coreTopology = CoreTopology.EMPTY; private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>(); private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile Map<MemberId,RoleInfo> coreRoles = Collections.emptyMap();


private Thread startingThread; private Thread startingThread;
private volatile boolean stopped; private volatile boolean stopped;


protected HazelcastCoreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler, HazelcastCoreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver, LogProvider logProvider, LogProvider userLogProvider, HostnameResolver hostnameResolver,
TopologyServiceRetryStrategy topologyServiceRetryStrategy ) TopologyServiceRetryStrategy topologyServiceRetryStrategy )
{ {
Expand All @@ -109,14 +112,14 @@ protected HazelcastCoreTopologyService( Config config, MemberId myself, JobSched
} }


@Override @Override
public void addCoreTopologyListener( Listener listener ) public void addLocalCoreTopologyListener( Listener listener )
{ {
listenerService.addCoreTopologyListener( listener ); listenerService.addCoreTopologyListener( listener );
listener.onCoreTopologyChange( localCoreServers() ); listener.onCoreTopologyChange( localCoreServers() );
} }


@Override @Override
public void removeCoreTopologyListener( Listener listener ) public void removeLocalCoreTopologyListener( Listener listener )
{ {
listenerService.removeCoreTopologyListener( listener ); listenerService.removeCoreTopologyListener( listener );
} }
Expand All @@ -138,11 +141,9 @@ public void setLeader( LeaderInfo leaderInfo, String dbName )
} }


@Override @Override
public Map<MemberId,RoleInfo> allCoreRoles() throws InterruptedException public Map<MemberId,RoleInfo> allCoreRoles()
{ {
waitOnHazelcastInstanceCreation(); return coreRoles;

return HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() );
} }


@Override @Override
Expand Down Expand Up @@ -350,11 +351,14 @@ private Optional<AdvertisedSocketAddress> retrieveSocketAddress( MemberId member


private void refreshRoles() throws InterruptedException private void refreshRoles() throws InterruptedException
{ {
waitOnHazelcastInstanceCreation();

if ( leaderInfo != null && leaderInfo.memberId().equals( myself ) ) if ( leaderInfo != null && leaderInfo.memberId().equals( myself ) )
{ {
waitOnHazelcastInstanceCreation();
HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName ); HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName );
} }

coreRoles = HazelcastClusterTopology.getCoreRoles( hazelcastInstance, allCoreServers().members().keySet() );
} }


private synchronized void refreshTopology() throws InterruptedException private synchronized void refreshTopology() throws InterruptedException
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;


/** /**
* Makes the Raft aware of changes to the core topology and visa versa * Makes the Raft aware of changes to the core topology and vice versa
*/ */
public class RaftCoreTopologyConnector extends LifecycleAdapter implements CoreTopologyService.Listener, LeaderListener public class RaftCoreTopologyConnector extends LifecycleAdapter implements CoreTopologyService.Listener, LeaderListener
{ {
Expand All @@ -46,7 +46,7 @@ public RaftCoreTopologyConnector( CoreTopologyService coreTopologyService, RaftM
@Override @Override
public void start() public void start()
{ {
coreTopologyService.addCoreTopologyListener( this ); coreTopologyService.addLocalCoreTopologyListener( this );
raftMachine.registerListener( this ); raftMachine.registerListener( this );
} }


Expand Down
Expand Up @@ -43,5 +43,5 @@ public interface TopologyService extends Lifecycle


Optional<AdvertisedSocketAddress> findCatchupAddress( MemberId upstream ); Optional<AdvertisedSocketAddress> findCatchupAddress( MemberId upstream );


Map<MemberId,RoleInfo> allCoreRoles() throws InterruptedException; Map<MemberId,RoleInfo> allCoreRoles();
} }
Expand Up @@ -42,8 +42,6 @@
import org.neo4j.kernel.api.ResourceTracker; import org.neo4j.kernel.api.ResourceTracker;
import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.Context; import org.neo4j.kernel.api.proc.Context;
import org.neo4j.kernel.api.proc.Neo4jTypes;
import org.neo4j.kernel.api.proc.QualifiedName;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


Expand Down Expand Up @@ -84,14 +82,7 @@ public RawIterator<Object[],ProcedureException> apply(
{ {
Map<MemberId,RoleInfo> roleMap = emptyMap(); Map<MemberId,RoleInfo> roleMap = emptyMap();
List<ReadWriteEndPoint> endpoints = new ArrayList<>(); List<ReadWriteEndPoint> endpoints = new ArrayList<>();
try roleMap = topologyService.allCoreRoles();
{
roleMap = topologyService.allCoreRoles();
}
catch ( InterruptedException e )
{
log.warn( "Hazelcast instance not yet available.", e );
}


CoreTopology coreTopology = topologyService.allCoreServers(); CoreTopology coreTopology = topologyService.allCoreServers();
Set<MemberId> coreMembers = coreTopology.members().keySet(); Set<MemberId> coreMembers = coreTopology.members().keySet();
Expand Down
Expand Up @@ -71,14 +71,16 @@ public ClusterBinder( SimpleStorage<ClusterId> clusterIdStorage, CoreTopologySer
} }


/** /**
* This method verifies if the local topology being returned by the discovery service is a viable cluster. * This method verifies if the local topology being returned by the discovery service is a viable cluster
* and should be bootstrapped by this host.
*
* If true, then a) the topology is sufficiently large to form a cluster; & b) this host can bootstrap for * If true, then a) the topology is sufficiently large to form a cluster; & b) this host can bootstrap for
* its configured database. * its configured database.
* *
* @param coreTopology the present state of the local topology, as reported by the discovery service. * @param coreTopology the present state of the local topology, as reported by the discovery service.
* @return Whether or not coreTopology, in its current state, can form a viable cluster * @return Whether or not coreTopology, in its current state, can form a viable cluster
*/ */
private boolean isViableCluster( CoreTopology coreTopology ) private boolean hostShouldBootstrapCluster( CoreTopology coreTopology )
{ {
int memberCount = coreTopology.members().size(); int memberCount = coreTopology.members().size();
if ( memberCount < minCoreHosts ) if ( memberCount < minCoreHosts )
Expand Down Expand Up @@ -130,7 +132,7 @@ public BoundState bindToCluster() throws Throwable
clusterId = topology.clusterId(); clusterId = topology.clusterId();
log.info( "Bound to cluster: " + clusterId ); log.info( "Bound to cluster: " + clusterId );
} }
else if ( isViableCluster( topology ) ) else if ( hostShouldBootstrapCluster( topology ) )
{ {
clusterId = new ClusterId( UUID.randomUUID() ); clusterId = new ClusterId( UUID.randomUUID() );
snapshot = coreBootstrapper.bootstrap( topology.members().keySet() ); snapshot = coreBootstrapper.bootstrap( topology.members().keySet() );
Expand Down
Expand Up @@ -83,7 +83,8 @@ public void shouldSendReplicatedContentToLeader() throws Exception


RaftReplicator replicator = RaftReplicator replicator =
new RaftReplicator( leaderLocator, myself, outbound, sessionPool, new RaftReplicator( leaderLocator, myself, outbound, sessionPool,
capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy,
availabilityGuard, NullLogProvider.getInstance(), replicationLimit );


ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false ); Thread replicatingThread = replicatingThread( replicator, content, false );
Expand All @@ -110,7 +111,8 @@ public void shouldResendAfterTimeout() throws Exception
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>(); CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();


RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy,
availabilityGuard, NullLogProvider.getInstance(), replicationLimit );


ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false ); Thread replicatingThread = replicatingThread( replicator, content, false );
Expand Down Expand Up @@ -165,7 +167,8 @@ public void shouldReleaseSessionWhenFinished() throws Exception
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>(); CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();


RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy,
availabilityGuard, NullLogProvider.getInstance(), replicationLimit );


ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, true ); Thread replicatingThread = replicatingThread( replicator, content, true );
Expand Down
Expand Up @@ -64,14 +64,14 @@ public int compareTo( SharedDiscoveryCoreClient o )
} }


@Override @Override
public synchronized void addCoreTopologyListener( Listener listener ) public synchronized void addLocalCoreTopologyListener( Listener listener )
{ {
listenerService.addCoreTopologyListener( listener ); listenerService.addCoreTopologyListener( listener );
listener.onCoreTopologyChange( localCoreServers() ); listener.onCoreTopologyChange( localCoreServers() );
} }


@Override @Override
public void removeCoreTopologyListener( Listener listener ) public void removeLocalCoreTopologyListener( Listener listener )
{ {
listenerService.removeCoreTopologyListener( listener ); listenerService.removeCoreTopologyListener( listener );
} }
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


public class SharedDiscoveryService public final class SharedDiscoveryService
{ {
private static final int MIN_DISCOVERY_MEMBERS = 2; private static final int MIN_DISCOVERY_MEMBERS = 2;


Expand Down
Expand Up @@ -448,7 +448,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th
// when the poller is paused, transaction doesn't make it to the read replica // when the poller is paused, transaction doesn't make it to the read replica
try try
{ {
transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) ); transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 15 ) );
fail( "should have thrown exception" ); fail( "should have thrown exception" );
} }
catch ( TransactionFailureException e ) catch ( TransactionFailureException e )
Expand All @@ -458,7 +458,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th


// when the poller is resumed, it does make it to the read replica // when the poller is resumed, it does make it to the read replica
pollingClient.start(); pollingClient.start();
transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) ); transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 15 ) );
} }


private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database ) private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database )
Expand Down
Expand Up @@ -352,7 +352,7 @@ private boolean validRoleExists( InvalidArgumentsException e )


private boolean validRoleDoesNotExist( InvalidArgumentsException e ) private boolean validRoleDoesNotExist( InvalidArgumentsException e )
{ {
return !exists && e.getMessage().contains( "RoleInfo '" + roleName + "' does not exist" ); return !exists && e.getMessage().contains( "Role '" + roleName + "' does not exist" );
} }


private boolean userDoesNotExist( InvalidArgumentsException e ) private boolean userDoesNotExist( InvalidArgumentsException e )
Expand Down

0 comments on commit a1ee8ed

Please sign in to comment.