Skip to content

Commit

Permalink
When a node disconnects from the cluster (not enough master nodes, or…
Browse files Browse the repository at this point in the history
… a client node) and rejoins it might not update its internal routing table, closes #
  • Loading branch information
kimchy committed Apr 29, 2012
1 parent 5c6d831 commit deff094
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
Expand Up @@ -610,7 +610,7 @@ private ClusterState disconnectFromCluster(ClusterState clusterState, String rea
.build();

// clear the routing table, we have no master, so we need to recreate the routing when we reform the cluster
RoutingTable routingTable = RoutingTable.builder().version(clusterState.routingTable().version()).build();
RoutingTable routingTable = RoutingTable.builder().build();
// we also clean the metadata, since we are going to recover it if we become master
MetaData metaData = MetaData.builder().build();

Expand Down
Expand Up @@ -185,6 +185,7 @@ public void run() {
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
sendPingsHandler.close();
Expand Down Expand Up @@ -268,12 +269,21 @@ public void run() {
try {
// connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) {
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), nodeToSend);
transportService.connectToNodeLight(nodeToSend);
} else {
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), nodeToSend);
transportService.connectToNode(nodeToSend);
}
// we are connected, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
if (receivedResponses.containsKey(sendPingsHandler.id())) {
// we are connected and still in progress, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
} else {
// connect took too long, just log it and bail
latch.countDown();
logger.trace("[{}] connect to {} was too long outside of ping window, bailing", sendPingsHandler.id(), node);
}
} catch (ConnectTransportException e) {
// can't connect to the node
logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), nodeToSend);
Expand All @@ -295,7 +305,7 @@ public void run() {
}

private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
logger.trace("[{}] connecting to {}", id, nodeToSend);
logger.trace("[{}] sending to {}", id, nodeToSend);
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {

@Override
Expand Down

0 comments on commit deff094

Please sign in to comment.