Skip to content

Commit

Permalink
make cluster binder cope with an eventual topology
Browse files Browse the repository at this point in the history
Because the topology is now cached and refreshed periodically,
the cluster binder can no longer rely on the initial state.
  • Loading branch information
martinfurmanski committed Mar 8, 2017
1 parent 5c96a03 commit 35f61d4
Showing 1 changed file with 29 additions and 32 deletions.
Expand Up @@ -73,50 +73,47 @@ public void bindToCluster( ThrowingConsumer<CoreSnapshot, Throwable> snapshotIns
{
if ( clusterIdStorage.exists() )
{
ClusterId localClusterId = clusterIdStorage.readState();
publishClusterId( localClusterId );
clusterId = localClusterId;
clusterId = clusterIdStorage.readState();
publishClusterId( clusterId );
log.info( "Already bound to cluster: " + clusterId );
return;
}
else

CoreTopology topology;
long endTime = clock.millis() + timeoutMillis;

do
{
ClusterId commonClusterId;
CoreTopology topology = topologyService.coreServers();
if ( topology.canBeBootstrapped() )
topology = topologyService.coreServers();

if ( topology.clusterId() != null )
{
clusterId = topology.clusterId();
log.info( "Bound to cluster: " + clusterId );
}
else if ( topology.canBeBootstrapped() )
{
commonClusterId = new ClusterId( UUID.randomUUID() );
clusterId = new ClusterId( UUID.randomUUID() );
CoreSnapshot snapshot = coreBootstrapper.bootstrap( topology.members().keySet() );
log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, commonClusterId ) );
log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, clusterId ) );

snapshotInstaller.accept( snapshot );
publishClusterId( commonClusterId );
publishClusterId( clusterId );
}
else
{
long endTime = clock.millis() + timeoutMillis;

log.info( "Attempting to bind to : " + topology );
while ( (commonClusterId = topology.clusterId()) == null )
{
if ( clock.millis() < endTime )
{
retryWaiter.apply();
topology = topologyService.coreServers();
}
else
{
throw new TimeoutException( String.format( "Failed to join a cluster with members %s. Another" +
" member should have published a clusterId but none was detected. Please restart the " +
"cluster.", topology ));
}
}

log.info( "Bound to cluster: " + commonClusterId );
retryWaiter.apply();
}
} while ( clusterId == null && clock.millis() < endTime );

clusterIdStorage.writeState( commonClusterId );

clusterId = commonClusterId;
if ( clusterId == null )
{
throw new TimeoutException( String.format(
"Failed to join a cluster with members %s. Another member should have published " +
"a clusterId but none was detected. Please restart the cluster.", topology ) );
}

clusterIdStorage.writeState( clusterId );
}

public Optional<ClusterId> get()
Expand Down

0 comments on commit 35f61d4

Please sign in to comment.