Skip to content

Commit

Permalink
improve unicast to have another try at pinging other nodes within the…
Browse files Browse the repository at this point in the history
… ping timeout span
  • Loading branch information
kimchy committed Aug 10, 2011
1 parent 1c55567 commit 25c3e89
Showing 1 changed file with 18 additions and 12 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -177,16 +178,21 @@ public PingResponse[] pingAndWait(TimeValue timeout) {
@Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(timeout, false, sendPingsHandler);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
sendPings(timeout, true, sendPingsHandler);
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
transportService.disconnectFromNode(node);
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
sendPingsHandler.close();
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
transportService.disconnectFromNode(node);
}
sendPingsHandler.close();
}
});
}
});
}
Expand Down Expand Up @@ -227,7 +233,7 @@ public void close() {
}
}

void sendPings(final TimeValue timeout, boolean wait, final SendPingsHandler sendPingsHandler) {
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
Expand Down Expand Up @@ -281,9 +287,9 @@ void sendPings(final TimeValue timeout, boolean wait, final SendPingsHandler sen
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
}
}
if (wait) {
if (waitTime != null) {
try {
latch.await(timeout.millis() * 5, TimeUnit.MILLISECONDS);
latch.await(waitTime.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
Expand Down

0 comments on commit 25c3e89

Please sign in to comment.