Permalink
Browse files

Merge remote branch 'upstream/master'

  • Loading branch information...
2 parents 1f057d7 + 34c9c1d commit f269819c1f4b7054707c09286358c34b49b73516 Lei Gao committed Apr 10, 2012
@@ -153,4 +153,10 @@ public int hashCode() {
public int compareTo(Node other) {
return Integer.valueOf(this.id).compareTo(other.getId());
}
+
+ public boolean isEqualState(Node other) {
+ return id == other.getId() && host.equalsIgnoreCase(other.getHost())
+ && httpPort == other.getHttpPort() && socketPort == other.getSocketPort()
+ && adminPort == other.getAdminPort() && zoneId == other.getZoneId();
+ }
}
@@ -43,7 +43,9 @@
// simply a wrapper around a ConcurrentHashMap anyway :(
protected final ConcurrentHashMap<FailureDetectorListener, Object> listeners;
- protected final Map<Node, NodeStatus> nodeStatusMap;
+ // Maintain the list of nodes and their status by IDs (in order to handle
+ // host swaps)
+ protected final Map<Integer, CompositeNodeStatus> idNodeStatusMap;
protected final Logger logger = Logger.getLogger(getClass().getName());
@@ -53,11 +55,13 @@ protected AbstractFailureDetector(FailureDetectorConfig failureDetectorConfig) {
this.failureDetectorConfig = failureDetectorConfig;
listeners = new ConcurrentHashMap<FailureDetectorListener, Object>();
- nodeStatusMap = new ConcurrentHashMap<Node, NodeStatus>();
+ idNodeStatusMap = new ConcurrentHashMap<Integer, CompositeNodeStatus>();
for(Node node: failureDetectorConfig.getNodes()) {
- nodeStatusMap.put(node, createNodeStatus(failureDetectorConfig.getTime()
- .getMilliseconds()));
+ idNodeStatusMap.put(node.getId(),
+ new CompositeNodeStatus(node,
+ createNodeStatus(failureDetectorConfig.getTime()
+ .getMilliseconds())));
}
}
@@ -211,19 +215,28 @@ protected void setUnavailable(Node node, UnreachableStoreException e) {
}
protected NodeStatus getNodeStatus(Node node) {
- NodeStatus nodeStatus = nodeStatusMap.get(node);
+ NodeStatus nodeStatus = null;
+ CompositeNodeStatus currentNodeStatus = idNodeStatusMap.get(node.getId());
- if(nodeStatus == null) {
+ if(currentNodeStatus == null || !currentNodeStatus.getNode().isEqualState(node)) {
if(logger.isEnabledFor(Level.WARN))
logger.warn("creating new node status for node " + node.getId()
+ " for failure detector");
+ // If the host is being replaced, remove old tracking information
+ if(currentNodeStatus != null) {
+ idNodeStatusMap.remove(currentNodeStatus);
+ failureDetectorConfig.removeNode(currentNodeStatus.getNode());
+ }
+
nodeStatus = createNodeStatus(failureDetectorConfig.getTime().getMilliseconds());
- nodeStatusMap.put(node, nodeStatus);
+ idNodeStatusMap.put(node.getId(), new CompositeNodeStatus(node, nodeStatus));
+
if(!failureDetectorConfig.getNodes().contains(node)) {
failureDetectorConfig.addNode(node);
}
- }
+ } else
+ nodeStatus = currentNodeStatus.getStatus();
return nodeStatus;
}
@@ -261,4 +274,27 @@ private boolean setAvailable(NodeStatus nodeStatus, boolean isAvailable) {
}
}
+ private class CompositeNodeStatus {
+
+ private Node node;
+ private NodeStatus status;
+
+ CompositeNodeStatus(Node node, NodeStatus status) {
+ this.node = node;
+ this.status = status;
+ }
+
+ public void setValues(Node node, NodeStatus status) {
+ this.node = node;
+ this.status = status;
+ }
+
+ public Node getNode() {
+ return this.node;
+ }
+
+ public NodeStatus getStatus() {
+ return this.status;
+ }
+ }
}
@@ -23,7 +23,6 @@
import java.util.HashSet;
import java.util.List;
-import com.google.common.collect.ImmutableSet;
import voldemort.client.ClientConfig;
import voldemort.cluster.Node;
import voldemort.server.VoldemortConfig;
@@ -32,6 +31,7 @@
import voldemort.utils.Utils;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
/**
* FailureDetectorConfig simply holds all the data that was available to it upon
@@ -567,6 +567,11 @@ public synchronized void addNode(Node node) {
nodes.add(node);
}
+ public synchronized void removeNode(Node node) {
+ Utils.notNull(node);
+ nodes.remove(node);
+ }
+
public StoreVerifier getStoreVerifier() {
return storeVerifier;
}
@@ -25,6 +25,7 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.log4j.Level;
import voldemort.VoldemortException;
@@ -59,12 +60,16 @@
private StreamRequestHandler streamRequestHandler;
+ private MutableInt serverConnectionCount;
+
public AsyncRequestHandler(Selector selector,
SocketChannel socketChannel,
RequestHandlerFactory requestHandlerFactory,
- int socketBufferSize) {
+ int socketBufferSize,
+ MutableInt serverConnectionCount) {
super(selector, socketChannel, socketBufferSize);
this.requestHandlerFactory = requestHandlerFactory;
+ this.serverConnectionCount = serverConnectionCount;
}
@Override
@@ -345,4 +350,12 @@ private boolean initRequestHandler(SelectionKey selectionKey) {
}
}
+ @Override
+ public void close() {
+ if(!isClosed.compareAndSet(false, true))
+ return;
+
+ serverConnectionCount.decrement();
+ closeInternal();
+ }
}
@@ -23,6 +23,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.log4j.Level;
import voldemort.server.protocol.RequestHandlerFactory;
@@ -99,13 +100,16 @@
private final int socketBufferSize;
+ private MutableInt numActiveConnections;
+
public NioSelectorManager(InetSocketAddress endpoint,
RequestHandlerFactory requestHandlerFactory,
int socketBufferSize) {
this.endpoint = endpoint;
this.socketChannelQueue = new ConcurrentLinkedQueue<SocketChannel>();
this.requestHandlerFactory = requestHandlerFactory;
this.socketBufferSize = socketBufferSize;
+ this.numActiveConnections = new MutableInt(0);
}
public void accept(SocketChannel socketChannel) {
@@ -155,10 +159,13 @@ protected void processEvents() {
AsyncRequestHandler attachment = new AsyncRequestHandler(selector,
socketChannel,
requestHandlerFactory,
- socketBufferSize);
+ socketBufferSize,
+ numActiveConnections);
- if(!isClosed.get())
+ if(!isClosed.get()) {
socketChannel.register(selector, SelectionKey.OP_READ, attachment);
+ numActiveConnections.increment();
+ }
} catch(ClosedSelectorException e) {
if(logger.isDebugEnabled())
logger.debug("Selector is closed, exiting");
@@ -177,4 +184,21 @@ protected void processEvents() {
}
}
+ /**
+ * Returns the number of active connections for this selector manager
+ *
+ * @return
+ */
+ public Integer getNumActiveConnections() {
+ return numActiveConnections.toInteger();
+ }
+
+ /**
+ * Returns the number of connections queued for registration
+ *
+ * @return
+ */
+ public Integer getNumQueuedConnections() {
+ return socketChannelQueue.size();
+ }
}
@@ -31,6 +31,7 @@
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
+import voldemort.annotations.jmx.JmxGetter;
import voldemort.server.AbstractSocketService;
import voldemort.server.ServiceType;
import voldemort.server.StatusManager;
@@ -258,4 +259,22 @@ public void run() {
}
+ @JmxGetter(name = "numActiveConnections", description = "total number of active connections across selector managers")
+ public final int getNumActiveConnections() {
+ int sum = 0;
+ for(NioSelectorManager manager: selectorManagers) {
+ sum += manager.getNumActiveConnections();
+ }
+ return sum;
+ }
+
+ @JmxGetter(name = "numQueuedConnections", description = "total number of connections pending for registration with selector managers")
+ public final int getNumQueuedConnections() {
+ int sum = 0;
+ for(NioSelectorManager manager: selectorManagers) {
+ sum += manager.getNumQueuedConnections();
+ }
+ return sum;
+ }
+
}
@@ -240,10 +240,9 @@ public void close() throws VoldemortException {
private <T> T request(ClientRequest<T> delegate, String operationName) {
ClientRequestExecutor clientRequestExecutor = pool.checkout(destination);
-
+ BlockingClientRequest<T> blockingClientRequest = null;
try {
- BlockingClientRequest<T> blockingClientRequest = new BlockingClientRequest<T>(delegate,
- timeoutMs);
+ blockingClientRequest = new BlockingClientRequest<T>(delegate, timeoutMs);
clientRequestExecutor.addClientRequest(blockingClientRequest, timeoutMs);
blockingClientRequest.await();
return blockingClientRequest.getResult();
@@ -255,14 +254,18 @@ public void close() throws VoldemortException {
throw new UnreachableStoreException("Failure in " + operationName + " on "
+ destination + ": " + e.getMessage(), e);
} finally {
+ if(blockingClientRequest != null && !blockingClientRequest.isComplete()) {
+ // close the executor if we timed out
+ clientRequestExecutor.close();
+ }
pool.checkin(destination, clientRequestExecutor);
}
}
/**
* This method handles submitting and then waiting for the request from the
* server. It uses the ClientRequest API to actually write the request and
- * then read back the response. This implementation will block for a
+ * then read back the response. This implementation will not block for a
* response from the server.
*
* @param <T> Return type
@@ -1,29 +1,31 @@
package voldemort.store.stats;
-import voldemort.VoldemortException;
-import voldemort.annotations.concurrency.Threadsafe;
-
import java.util.Arrays;
+import org.apache.log4j.Logger;
+
+import voldemort.annotations.concurrency.Threadsafe;
+
/**
* A class for computing percentiles based on a histogram. Values are bucketed
* by a configurable bound (e.g., 0-1, 1-2, 2-3). When a value is inserted,
* perform a binary search to find the correct bucket.
- *
- *
+ *
+ *
*/
@Threadsafe
public class Histogram {
-
+
private final int nBuckets;
private final int step;
private final int[] buckets;
private final int[] bounds;
private int size;
+ private static final Logger logger = Logger.getLogger(Histogram.class);
/**
* Initialize an empty histogram
- *
+ *
* @param nBuckets The number of buckets to use
* @param step The size of each bucket
*/
@@ -34,7 +36,7 @@ public Histogram(int nBuckets, int step) {
this.bounds = new int[nBuckets];
init();
}
-
+
protected void init() {
int bound = 0;
for(int i = 0; i < nBuckets; i++, bound += step) {
@@ -54,22 +56,23 @@ public synchronized void reset() {
/**
* Insert a value into the right bucket of the histogram. If the value is
* larger than any bound, insert into the last bucket
- *
+ *
* @param data The value to insert into the histogram
*/
- public synchronized void insert(int data) {
+ public synchronized void insert(long data) {
int index = findBucket(data);
if(index == -1) {
- throw new VoldemortException(data + " can't be bucketed, is invalid!");
+ logger.error(data + " can't be bucketed, is invalid!");
+ return;
}
buckets[index]++;
size++;
}
/**
- * Find the a value <em>n</em> such that the percentile falls within
- * [<em>n</em>, <em>n + step</em>)
- *
+ * Find the a value <em>n</em> such that the percentile falls within [
+ * <em>n</em>, <em>n + step</em>)
+ *
* @param quantile The percentile to find
* @return Lower bound associated with the percentile
*/
@@ -84,9 +87,9 @@ public synchronized int getQuantile(double quantile) {
}
return 0;
}
-
- private int findBucket(int needle) {
- int max = step * nBuckets;
+
+ private int findBucket(long needle) {
+ long max = step * nBuckets;
if(needle > max) {
return nBuckets - 1;
}
@@ -105,8 +108,8 @@ private int findBucket(int needle) {
}
return -1;
}
-
- private int compareToBucket(int bucket, int needle) {
+
+ private int compareToBucket(int bucket, long needle) {
int low = bounds[bucket];
int high = low + step;
if(low <= needle && high > needle) {
Oops, something went wrong.

0 comments on commit f269819

Please sign in to comment.