From cac693d8e1c529c325962fdf6ad81fd0f6b3168b Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Tue, 7 Mar 2017 16:07:48 +0100 Subject: [PATCH] make cluster binder cope with an eventual topology Because the topology is now cached and refreshed periodically, the cluster binder can no longer rely on the initial state. --- .../identity/ClusterIdentity.java | 61 +++++++++---------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterIdentity.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterIdentity.java index 5832a433b8b9c..5c5c4c12363b5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterIdentity.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/ClusterIdentity.java @@ -71,50 +71,47 @@ public void bindToCluster( ThrowingConsumer snapshotIns { if ( clusterIdStorage.exists() ) { - ClusterId localClusterId = clusterIdStorage.readState(); - publishClusterId( localClusterId ); - clusterId = localClusterId; + clusterId = clusterIdStorage.readState(); + publishClusterId( clusterId ); + log.info( "Already bound to cluster: " + clusterId ); + return; } - else + + CoreTopology topology; + long endTime = clock.millis() + timeoutMillis; + + do { - ClusterId commonClusterId; - CoreTopology topology = topologyService.coreServers(); - if ( topology.canBeBootstrapped() ) + topology = topologyService.coreServers(); + + if ( topology.clusterId() != null ) + { + clusterId = topology.clusterId(); + log.info( "Bound to cluster: " + clusterId ); + } + else if ( topology.canBeBootstrapped() ) { - commonClusterId = new ClusterId( UUID.randomUUID() ); + clusterId = new ClusterId( UUID.randomUUID() ); CoreSnapshot snapshot = coreBootstrapper.bootstrap( topology.members() ); - log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, commonClusterId ) ); + log.info( String.format( "Bootstrapped with snapshot: %s and clusterId: %s", snapshot, clusterId ) ); snapshotInstaller.accept( snapshot ); - publishClusterId( commonClusterId ); + publishClusterId( clusterId ); } else { - long endTime = clock.millis() + timeoutMillis; - - log.info( "Attempting to bind to : " + topology ); - while ( (commonClusterId = topology.clusterId()) == null ) - { - if ( clock.millis() < endTime ) - { - retryWaiter.apply(); - topology = topologyService.coreServers(); - } - else - { - throw new TimeoutException( String.format( "Failed to join a cluster with members %s. Another" + - " member should have published a clusterId but none was detected. Please restart the " + - "cluster.", topology )); - } - } - - log.info( "Bound to cluster: " + commonClusterId ); + retryWaiter.apply(); } + } while ( clusterId == null && clock.millis() < endTime ); - clusterIdStorage.writeState( commonClusterId ); - - clusterId = commonClusterId; + if ( clusterId == null ) + { + throw new TimeoutException( String.format( + "Failed to join a cluster with members %s. Another member should have published " + + "a clusterId but none was detected. Please restart the cluster.", topology ) ); } + + clusterIdStorage.writeState( clusterId ); } public ClusterId clusterId()