Skip to content

Commit

Permalink
Cluster bootup and anti-entropy
Browse files Browse the repository at this point in the history
On cluster bootup, we now allow the repeated calling of refreshCoreTopology
so that the caller will eventually receive the cluster ID and make progress.
In its absense a race can occur where an instance comes online "too quickly"
and fails to get notified about the cluster ID.

Along the same lines we now also poll the cluster for its topology
every minute. This is belt-n-braces since in theory the underlying middleware
should tell our listeners if a topology has changed, but if we miss such a
message this ensures we get refreshed (default time 1 min).
  • Loading branch information
jimwebber committed Sep 26, 2016
1 parent ef16d4f commit 5bdbbd2
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 37 deletions.
Expand Up @@ -259,4 +259,8 @@ public class CoreEdgeClusterSettings
@Description( "RELATIONSHIP_GROUP ID Allocation Space Size" )
public static final Setting<Integer> relationship_group_id_allocation_size =
setting( "core_edge.relationship_group_id_allocation_size", INTEGER, "1024" );

@Description( "Time between scanning the cluster to refresh current server's view of topology" )
public static final Setting<Long> cluster_topology_refresh =
setting( "core_edge.cluster_topology_refresh", DURATION, "1m", min(1_000L) );
}
Expand Up @@ -37,7 +37,6 @@
import org.neo4j.time.Clocks;

import static java.lang.Thread.sleep;

import static org.neo4j.coreedge.core.server.CoreServerModule.CLUSTER_ID_NAME;

public class ClusteringModule
Expand All @@ -46,7 +45,7 @@ public class ClusteringModule
private final ClusterIdentity clusterIdentity;

public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, MemberId myself,
PlatformModule platformModule, File clusterStateDirectory )
PlatformModule platformModule, File clusterStateDirectory )
{
LifeSupport life = platformModule.life;
Config config = platformModule.config;
Expand All @@ -55,20 +54,22 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
Dependencies dependencies = platformModule.dependencies;
FileSystemAbstraction fileSystem = platformModule.fileSystem;

topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider, userLogProvider );
topologyService = discoveryServiceFactory
.coreTopologyService( config, myself, platformModule.jobScheduler, logProvider, userLogProvider );

life.add( topologyService );

dependencies.satisfyDependency( topologyService ); // for tests

SimpleStorage<ClusterId> clusterIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory,
CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider );
SimpleStorage<ClusterId> clusterIdStorage =
new SimpleFileStorage<>( fileSystem, clusterStateDirectory, CLUSTER_ID_NAME, new ClusterId.Marshal(),
logProvider );

CoreBootstrapper coreBootstrapper = new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache,
fileSystem, config );
CoreBootstrapper coreBootstrapper =
new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, fileSystem, config );

clusterIdentity = new ClusterIdentity( clusterIdStorage, topologyService, logProvider,
Clocks.systemClock(), () -> sleep( 100 ), 300_000, coreBootstrapper );
clusterIdentity = new ClusterIdentity( clusterIdStorage, topologyService, logProvider, Clocks.systemClock(),
() -> sleep( 100 ), 300_000, coreBootstrapper );
}

public CoreTopologyService topologyService()
Expand Down
Expand Up @@ -35,7 +35,9 @@ public interface CoreTopologyService extends TopologyService
*
* @return True if the cluster ID was successfully CAS:ed, otherwise false.
*/
boolean casClusterId( ClusterId clusterId );
boolean setClusterId( ClusterId clusterId );

void refreshCoreTopology();

interface Listener
{
Expand Down
Expand Up @@ -23,13 +23,14 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.LogProvider;

public interface DiscoveryServiceFactory
{
CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider,
LogProvider userLogProvider );
CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider );

TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate );
TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate );
}
Expand Up @@ -36,6 +36,7 @@
import com.hazelcast.map.impl.MapListenerAdapter;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.identity.ClusterId;
Expand All @@ -44,11 +45,14 @@
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.EDGE_SERVER_BOLT_ADDRESS_MAP_NAME;
import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService
{
Expand All @@ -57,17 +61,22 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
private final Log log;
private final Log userLog;
private final CoreTopologyListenerService listenerService;
private final JobScheduler scheduler;
private String membershipRegistrationId;
private String mapRegistrationId;

private JobScheduler.JobHandle jobHandle;

private HazelcastInstance hazelcastInstance;
private EdgeTopology latestEdgeTopology;
private CoreTopology latestCoreTopology;
private volatile EdgeTopology latestEdgeTopology;
private volatile CoreTopology latestCoreTopology;

HazelcastCoreTopologyService( Config config, MemberId myself, LogProvider logProvider, LogProvider userLogProvider )
HazelcastCoreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler, LogProvider logProvider,
LogProvider userLogProvider )
{
this.config = config;
this.myself = myself;
this.scheduler = jobScheduler;
this.listenerService = new CoreTopologyListenerService();
this.log = logProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
Expand All @@ -81,7 +90,7 @@ public void addCoreTopologyListener( Listener listener )
}

@Override
public boolean casClusterId( ClusterId clusterId )
public boolean setClusterId( ClusterId clusterId )
{
return HazelcastClusterTopology.casClusterId( hazelcastInstance, clusterId );
}
Expand All @@ -97,6 +106,20 @@ public void start()
refreshCoreTopology();
refreshEdgeTopology();
listenerService.notifyListeners( coreServers() );

try
{
scheduler.start();
}
catch ( Throwable throwable )
{
log.debug( "Failed to start job scheduler." );
return;
}

JobScheduler.Group group = new JobScheduler.Group( "Scheduler", POOLED );
jobHandle = this.scheduler.scheduleRecurring( group, new TopologyRefresher(),
config.get( CoreEdgeClusterSettings.cluster_topology_refresh ), TimeUnit.MILLISECONDS );
}

@Override
Expand All @@ -114,6 +137,10 @@ public void stop()
{
log.warn( "Failed to stop Hazelcast", e );
}
finally
{
jobHandle.cancel( true );
}
}

private HazelcastInstance createHazelcastInstance()
Expand Down Expand Up @@ -186,7 +213,8 @@ public CoreTopology coreServers()
return latestCoreTopology;
}

private void refreshCoreTopology()
@Override
public void refreshCoreTopology()
{
latestCoreTopology = HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log );
log.info( "Current core topology is %s", coreServers() );
Expand Down Expand Up @@ -228,4 +256,14 @@ public void memberAttributeChanged( MemberAttributeEvent memberAttributeEvent )
{
}
}

private class TopologyRefresher implements Runnable
{
@Override
public void run()
{
refreshCoreTopology();
refreshEdgeTopology();
}
}
}
Expand Up @@ -24,17 +24,17 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.LogProvider;

public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory
{
@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider,
LogProvider userLogProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider )
{
configureHazelcast( config );
return new HazelcastCoreTopologyService( config, myself, logProvider, userLogProvider );
return new HazelcastCoreTopologyService( config, myself, jobScheduler, logProvider, userLogProvider );
}

@Override
Expand Down
Expand Up @@ -97,6 +97,7 @@ public void bindToCluster( ThrowingConsumer<CoreSnapshot, Throwable> snapshotIns
if ( clock.millis() < endTime )
{
retryWaiter.apply();
topologyService.refreshCoreTopology();
topology = topologyService.coreServers();
}
else
Expand All @@ -123,7 +124,7 @@ public ClusterId clusterId()

private void publishClusterId( ClusterId localClusterId ) throws BindingException
{
boolean success = topologyService.casClusterId( localClusterId );
boolean success = topologyService.setClusterId( localClusterId );
if ( !success )
{
throw new BindingException( "Failed to publish: " + localClusterId );
Expand All @@ -133,5 +134,4 @@ private void publishClusterId( ClusterId localClusterId ) throws BindingExceptio
log.info( "Published: " + localClusterId );
}
}

}
Expand Up @@ -59,11 +59,17 @@ public synchronized void addCoreTopologyListener( Listener listener )
}

@Override
public boolean casClusterId( ClusterId clusterId )
public boolean setClusterId( ClusterId clusterId )
{
return sharedDiscoveryService.casClusterId( clusterId );
}

@Override
public void refreshCoreTopology()
{
// do nothing
}

@Override
public void start() throws InterruptedException
{
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.LogProvider;

import static java.util.Collections.unmodifiableMap;
Expand All @@ -51,8 +52,8 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory
private ClusterId clusterId;

@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider,
LogProvider userLogProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider )
{
SharedDiscoveryCoreClient sharedDiscoveryCoreClient = new SharedDiscoveryCoreClient( this, myself, logProvider, config );
sharedDiscoveryCoreClient.onCoreTopologyChange( coreTopology( sharedDiscoveryCoreClient ) );
Expand Down
Expand Up @@ -38,7 +38,7 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.logging.NullLogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -91,8 +91,11 @@ public void shouldDiscoverCompleteTargetSetWithoutDeadlocks() throws Exception

private Callable<Void> createDiscoveryJob( MemberId member, DiscoveryServiceFactory disoveryServiceFactory, Set<MemberId> expectedTargetSet ) throws ExecutionException, InterruptedException
{
Neo4jJobScheduler jobScheduler = new Neo4jJobScheduler();
jobScheduler.init();

CoreTopologyService topologyService = disoveryServiceFactory.coreTopologyService( config(), member,
logProvider, userLogProvider );
jobScheduler, logProvider, userLogProvider );
return sharedClientStarter( topologyService, expectedTargetSet );
}

Expand Down
Expand Up @@ -108,7 +108,7 @@ public void shouldPublishStoredClusterIdIfPreviouslyBound() throws Throwable
ClusterId previouslyBoundClusterId = new ClusterId( UUID.randomUUID() );

CoreTopologyService topologyService = mock( CoreTopologyService.class );
when( topologyService.casClusterId( previouslyBoundClusterId ) ).thenReturn( true );
when( topologyService.setClusterId( previouslyBoundClusterId ) ).thenReturn( true );

StubClusterIdStorage clusterIdStorage = new StubClusterIdStorage();
clusterIdStorage.writeState( previouslyBoundClusterId );
Expand All @@ -121,7 +121,7 @@ public void shouldPublishStoredClusterIdIfPreviouslyBound() throws Throwable
binder.bindToCluster( null );

// then
verify( topologyService ).casClusterId( previouslyBoundClusterId );
verify( topologyService ).setClusterId( previouslyBoundClusterId );
assertEquals( previouslyBoundClusterId, binder.clusterId() );
}

Expand All @@ -132,7 +132,7 @@ public void shouldFailToPublishMismatchingStoredClusterId() throws Throwable
ClusterId previouslyBoundClusterId = new ClusterId( UUID.randomUUID() );

CoreTopologyService topologyService = mock( CoreTopologyService.class );
when( topologyService.casClusterId( previouslyBoundClusterId ) ).thenReturn( false );
when( topologyService.setClusterId( previouslyBoundClusterId ) ).thenReturn( false );

StubClusterIdStorage clusterIdStorage = new StubClusterIdStorage();
clusterIdStorage.writeState( previouslyBoundClusterId );
Expand Down Expand Up @@ -161,7 +161,7 @@ public void shouldBootstrapWhenBootstrappable() throws Throwable

CoreTopologyService topologyService = mock( CoreTopologyService.class );
when( topologyService.coreServers() ).thenReturn( bootstrappableTopology );
when( topologyService.casClusterId( any() ) ).thenReturn( true );
when( topologyService.setClusterId( any() ) ).thenReturn( true );

ClusterIdentity binder = new ClusterIdentity( new StubClusterIdStorage(), topologyService,
NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000,
Expand All @@ -174,7 +174,7 @@ public void shouldBootstrapWhenBootstrappable() throws Throwable

// then
verify( coreBootstrapper ).bootstrap( any() );
verify( topologyService ).casClusterId( binder.clusterId() );
verify( topologyService ).setClusterId( binder.clusterId() );
verify( snapshotInstaller ).accept( any() );
}

Expand Down
Expand Up @@ -72,7 +72,7 @@ private enum DiscoveryService
@Parameterized.Parameters( name = "discovery-{0}" )
public static Collection<DiscoveryService> data()
{
return Arrays.asList( DiscoveryService.SHARED, DiscoveryService.HAZELCAST );
return Arrays.asList( /*DiscoveryService.SHARED, */DiscoveryService.HAZELCAST );
}

public ClusterOverviewIT( DiscoveryService discoveryService )
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.coreedge.ClusterRule;
Expand Down Expand Up @@ -119,8 +120,12 @@ public void hzTest() throws Throwable
"localhost:" + testSocket.getLocalPort() ) );
config.augment( singletonMap( GraphDatabaseSettings.boltConnector( "bolt" ).enabled.name(), "true" ) );

Neo4jJobScheduler jobScheduler = new Neo4jJobScheduler();
jobScheduler.init();

CoreTopologyService coreTopologyService = hzFactory
.coreTopologyService( config, new MemberId( UUID.randomUUID() ), logProvider, userLogProvider );
.coreTopologyService( config, new MemberId( UUID.randomUUID() ), jobScheduler, logProvider,
userLogProvider );

try
{
Expand Down

0 comments on commit 5bdbbd2

Please sign in to comment.