Skip to content

Commit

Permalink
Transport Client: Adding more nodes causes more scheduled reconnect t…
Browse files Browse the repository at this point in the history
…asks, closes elastic#1062.
  • Loading branch information
kimchy committed Jun 24, 2011
1 parent 40bbb87 commit 08648ec
Showing 1 changed file with 30 additions and 20 deletions.
Expand Up @@ -33,7 +33,11 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -66,7 +70,7 @@ public class TransportClientNodesService extends AbstractComponent {

private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();

private final Runnable nodesSampler;
private final NodeSampler nodesSampler;

private volatile ScheduledFuture nodesSamplerFuture;

Expand All @@ -88,11 +92,11 @@ public class TransportClientNodesService extends AbstractComponent {
}

if (componentSettings.getAsBoolean("sniff", false)) {
this.nodesSampler = new ScheduledSniffNodesSampler();
this.nodesSampler = new SniffNodesSampler();
} else {
this.nodesSampler = new ScheduledConnectNodeSampler();
this.nodesSampler = new SimpleNodeSampler();
}
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, nodesSampler);
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, new ScheduledNodeSampler());

// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
Expand All @@ -115,7 +119,7 @@ public TransportClientNodesService addTransportAddress(TransportAddress transpor
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build();
}
nodesSampler.run();
nodesSampler.sample();
return this;
}

Expand All @@ -129,7 +133,7 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans
}
listedNodes = builder.build();
}
nodesSampler.run();
nodesSampler.sample();
return this;
}

Expand Down Expand Up @@ -157,8 +161,22 @@ public void close() {
transportService.disconnectFromNode(listedNode);
}

private class ScheduledConnectNodeSampler implements Runnable {
@Override public synchronized void run() {
interface NodeSampler {
void sample();
}

class ScheduledNodeSampler implements Runnable {
@Override public void run() {
nodesSampler.sample();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}

class SimpleNodeSampler implements NodeSampler {

@Override public synchronized void sample() {
if (closed) {
return;
}
Expand Down Expand Up @@ -188,16 +206,12 @@ private class ScheduledConnectNodeSampler implements Runnable {
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();

if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}

private class ScheduledSniffNodesSampler implements Runnable {
class SniffNodesSampler implements NodeSampler {

@Override public synchronized void run() {
@Override public synchronized void sample() {
if (closed) {
return;
}
Expand Down Expand Up @@ -256,7 +270,7 @@ private class ScheduledSniffNodesSampler implements Runnable {
}
}
// now, make sure we are connected to all the updated nodes
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext();) {
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
try {
transportService.connectToNode(node);
Expand All @@ -266,10 +280,6 @@ private class ScheduledSniffNodesSampler implements Runnable {
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();

if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}

Expand Down

0 comments on commit 08648ec

Please sign in to comment.