Skip to content

Commit

Permalink
Merge branch 'master' of github.com:voldemort/voldemort into track-se…
Browse files Browse the repository at this point in the history
…rver-conn
  • Loading branch information
vinothchandar committed Mar 26, 2012
2 parents a871824 + d2fae73 commit 3481e4c
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 29 deletions.
6 changes: 6 additions & 0 deletions src/java/voldemort/cluster/Node.java
Expand Up @@ -153,4 +153,10 @@ public int hashCode() {
public int compareTo(Node other) {
return Integer.valueOf(this.id).compareTo(other.getId());
}

public boolean isEqualState(Node other) {
return id == other.getId() && host.equalsIgnoreCase(other.getHost())
&& httpPort == other.getHttpPort() && socketPort == other.getSocketPort()
&& adminPort == other.getAdminPort() && zoneId == other.getZoneId();
}
}
Expand Up @@ -43,7 +43,9 @@ public abstract class AbstractFailureDetector implements FailureDetector {
// simply a wrapper around a ConcurrentHashMap anyway :(
protected final ConcurrentHashMap<FailureDetectorListener, Object> listeners;

protected final Map<Node, NodeStatus> nodeStatusMap;
// Maintain the list of nodes and their status by IDs (in order to handle
// host swaps)
protected final Map<Integer, CompositeNodeStatus> idNodeStatusMap;

protected final Logger logger = Logger.getLogger(getClass().getName());

Expand All @@ -53,11 +55,13 @@ protected AbstractFailureDetector(FailureDetectorConfig failureDetectorConfig) {

this.failureDetectorConfig = failureDetectorConfig;
listeners = new ConcurrentHashMap<FailureDetectorListener, Object>();
nodeStatusMap = new ConcurrentHashMap<Node, NodeStatus>();
idNodeStatusMap = new ConcurrentHashMap<Integer, CompositeNodeStatus>();

for(Node node: failureDetectorConfig.getNodes()) {
nodeStatusMap.put(node, createNodeStatus(failureDetectorConfig.getTime()
.getMilliseconds()));
idNodeStatusMap.put(node.getId(),
new CompositeNodeStatus(node,
createNodeStatus(failureDetectorConfig.getTime()
.getMilliseconds())));
}
}

Expand Down Expand Up @@ -211,19 +215,28 @@ protected void setUnavailable(Node node, UnreachableStoreException e) {
}

protected NodeStatus getNodeStatus(Node node) {
NodeStatus nodeStatus = nodeStatusMap.get(node);
NodeStatus nodeStatus = null;
CompositeNodeStatus currentNodeStatus = idNodeStatusMap.get(node.getId());

if(nodeStatus == null) {
if(currentNodeStatus == null || !currentNodeStatus.getNode().isEqualState(node)) {
if(logger.isEnabledFor(Level.WARN))
logger.warn("creating new node status for node " + node.getId()
+ " for failure detector");

// If the host is being replaced, remove old tracking information
if(currentNodeStatus != null) {
idNodeStatusMap.remove(currentNodeStatus);
failureDetectorConfig.removeNode(currentNodeStatus.getNode());
}

nodeStatus = createNodeStatus(failureDetectorConfig.getTime().getMilliseconds());
nodeStatusMap.put(node, nodeStatus);
idNodeStatusMap.put(node.getId(), new CompositeNodeStatus(node, nodeStatus));

if(!failureDetectorConfig.getNodes().contains(node)) {
failureDetectorConfig.addNode(node);
}
}
} else
nodeStatus = currentNodeStatus.getStatus();

return nodeStatus;
}
Expand Down Expand Up @@ -261,4 +274,27 @@ private boolean setAvailable(NodeStatus nodeStatus, boolean isAvailable) {
}
}

private class CompositeNodeStatus {

private Node node;
private NodeStatus status;

CompositeNodeStatus(Node node, NodeStatus status) {
this.node = node;
this.status = status;
}

public void setValues(Node node, NodeStatus status) {
this.node = node;
this.status = status;
}

public Node getNode() {
return this.node;
}

public NodeStatus getStatus() {
return this.status;
}
}
}
Expand Up @@ -23,7 +23,6 @@
import java.util.HashSet;
import java.util.List;

import com.google.common.collect.ImmutableSet;
import voldemort.client.ClientConfig;
import voldemort.cluster.Node;
import voldemort.server.VoldemortConfig;
Expand All @@ -32,6 +31,7 @@
import voldemort.utils.Utils;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

/**
* FailureDetectorConfig simply holds all the data that was available to it upon
Expand Down Expand Up @@ -567,6 +567,11 @@ public synchronized void addNode(Node node) {
nodes.add(node);
}

public synchronized void removeNode(Node node) {
Utils.notNull(node);
nodes.remove(node);
}

public StoreVerifier getStoreVerifier() {
return storeVerifier;
}
Expand Down
41 changes: 22 additions & 19 deletions src/java/voldemort/store/stats/Histogram.java
@@ -1,29 +1,31 @@
package voldemort.store.stats;

import voldemort.VoldemortException;
import voldemort.annotations.concurrency.Threadsafe;

import java.util.Arrays;

import org.apache.log4j.Logger;

import voldemort.annotations.concurrency.Threadsafe;

/**
* A class for computing percentiles based on a histogram. Values are bucketed
* by a configurable bound (e.g., 0-1, 1-2, 2-3). When a value is inserted,
* perform a binary search to find the correct bucket.
*
*
*
*
*/
@Threadsafe
public class Histogram {

private final int nBuckets;
private final int step;
private final int[] buckets;
private final int[] bounds;
private int size;
private static final Logger logger = Logger.getLogger(Histogram.class);

/**
* Initialize an empty histogram
*
*
* @param nBuckets The number of buckets to use
* @param step The size of each bucket
*/
Expand All @@ -34,7 +36,7 @@ public Histogram(int nBuckets, int step) {
this.bounds = new int[nBuckets];
init();
}

protected void init() {
int bound = 0;
for(int i = 0; i < nBuckets; i++, bound += step) {
Expand All @@ -54,22 +56,23 @@ public synchronized void reset() {
/**
* Insert a value into the right bucket of the histogram. If the value is
* larger than any bound, insert into the last bucket
*
*
* @param data The value to insert into the histogram
*/
public synchronized void insert(int data) {
public synchronized void insert(long data) {
int index = findBucket(data);
if(index == -1) {
throw new VoldemortException(data + " can't be bucketed, is invalid!");
logger.error(data + " can't be bucketed, is invalid!");
return;
}
buckets[index]++;
size++;
}

/**
* Find the a value <em>n</em> such that the percentile falls within
* [<em>n</em>, <em>n + step</em>)
*
* Find the a value <em>n</em> such that the percentile falls within [
* <em>n</em>, <em>n + step</em>)
*
* @param quantile The percentile to find
* @return Lower bound associated with the percentile
*/
Expand All @@ -84,9 +87,9 @@ public synchronized int getQuantile(double quantile) {
}
return 0;
}
private int findBucket(int needle) {
int max = step * nBuckets;

private int findBucket(long needle) {
long max = step * nBuckets;
if(needle > max) {
return nBuckets - 1;
}
Expand All @@ -105,8 +108,8 @@ private int findBucket(int needle) {
}
return -1;
}
private int compareToBucket(int bucket, int needle) {

private int compareToBucket(int bucket, long needle) {
int low = bounds[bucket];
int high = low + step;
if(low <= needle && high > needle) {
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/store/stats/RequestCounter.java
Expand Up @@ -171,7 +171,7 @@ public void addRequest(long timeNS,
long numEmptyResponses,
long bytes,
long getAllAggregatedCount) {
int timeMs = (int) timeNS / (int) Time.NS_PER_MS;
long timeMs = timeNS / Time.NS_PER_MS;
if(this.useHistogram) {
histogram.insert(timeMs);
maybeResetHistogram();
Expand Down
28 changes: 28 additions & 0 deletions test/unit/voldemort/store/stats/RequestCounterTest.java
@@ -0,0 +1,28 @@
package voldemort.store.stats;

import org.junit.Before;
import org.junit.Test;

public class RequestCounterTest {

private RequestCounter requestCounter;

@Before
public void setUp() {
// Initialize the RequestCounter with a histogram
requestCounter = new RequestCounter(10000, true);
}

@Test
public void test() {
long val = 234;
requestCounter.addRequest(val);
}

@Test
public void testLargeValues() {
long val = 999999992342756424l;
requestCounter.addRequest(val);
}

}

0 comments on commit 3481e4c

Please sign in to comment.