Skip to content

Commit

Permalink
make cluster binder cope with an eventual topology
Browse files Browse the repository at this point in the history
Because the topology is now cached and refreshed periodically,
the cluster binder can no longer rely on the initial state.
  • Loading branch information
martinfurmanski committed Mar 13, 2017
1 parent 78190c7 commit cac693d
Showing 1 changed file with 29 additions and 32 deletions.
Expand Up @@ -71,50 +71,47 @@ public void bindToCluster( ThrowingConsumer<CoreSnapshot, Throwable> snapshotIns
{
if ( clusterIdStorage.exists() )
{
ClusterId localClusterId = clusterIdStorage.readState();
publishClusterId( localClusterId );
clusterId = localClusterId;
clusterId = clusterIdStorage.readState();
publishClusterId( clusterId );
log.info( "Already bound to cluster: " + clusterId );
return;
}
else

CoreTopology topology;
long endTime = clock.millis() + timeoutMillis;

do
{
ClusterId commonClusterId;
CoreTopology topology = topologyService.coreServers();
if ( topology.canBeBootstrapped() )
topology = topologyService.coreServers();

if ( topology.clusterId() != null )
{
clusterId = topology.clusterId();
log.info( "Bound to cluster: " + clusterId );
}
else if ( topology.canBeBootstrapped() )
{
commonClusterId = new ClusterId( UUID.randomUUID() );
clusterId = new ClusterId( UUID.randomUUID() );
CoreSnapshot snapshot = coreBootstrapper.bootstrap( topology.members() );
log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, commonClusterId ) );
log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, clusterId ) );

snapshotInstaller.accept( snapshot );
publishClusterId( commonClusterId );
publishClusterId( clusterId );
}
else
{
long endTime = clock.millis() + timeoutMillis;

log.info( "Attempting to bind to : " + topology );
while ( (commonClusterId = topology.clusterId()) == null )
{
if ( clock.millis() < endTime )
{
retryWaiter.apply();
topology = topologyService.coreServers();
}
else
{
throw new TimeoutException( String.format( "Failed to join a cluster with members %s. Another" +
" member should have published a clusterId but none was detected. Please restart the " +
"cluster.", topology ));
}
}

log.info( "Bound to cluster: " + commonClusterId );
retryWaiter.apply();
}
} while ( clusterId == null && clock.millis() < endTime );

clusterIdStorage.writeState( commonClusterId );

clusterId = commonClusterId;
if ( clusterId == null )
{
throw new TimeoutException( String.format(
"Failed to join a cluster with members %s. Another member should have published " +
"a clusterId but none was detected. Please restart the cluster.", topology ) );
}

clusterIdStorage.writeState( clusterId );
}

public ClusterId clusterId()
Expand Down

0 comments on commit cac693d

Please sign in to comment.