Permalink
Browse files

Merge branch 'master' of github.com:zhongjiewu/voldemort

  • Loading branch information...
zhongjiewu committed Jun 7, 2012
2 parents b3e7212 + c709be8 commit 9bb1704a6f1ec6be1fbf0a05626c6f62de79e409
@@ -38,7 +38,6 @@
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.utils.ByteArray;
-import voldemort.utils.JmxUtils;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.Versioned;
@@ -69,8 +68,9 @@ public SocketStoreClientFactory(ClientConfig config) {
config.getSocketTimeout(TimeUnit.MILLISECONDS),
config.getSocketBufferSize(),
config.getSocketKeepAlive());
- if(config.isJmxEnabled())
- JmxUtils.registerMbean(storeFactory, JmxUtils.createObjectName(storeFactory.getClass()));
+ if(config.isJmxEnabled()) {
+ ((ClientRequestExecutorPool) storeFactory).registerJmx();
+ }
}
@Override
@@ -87,7 +87,8 @@ public SocketStoreClientFactory(ClientConfig config) {
return getParentStoreClient(storeName, resolver);
}
- private <K, V> StoreClient<K, V> getParentStoreClient(String storeName, InconsistencyResolver<Versioned<V>> resolver) {
+ private <K, V> StoreClient<K, V> getParentStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
return super.getStoreClient(storeName, resolver);
}
@@ -96,7 +97,8 @@ public SocketStoreClientFactory(ClientConfig config) {
try {
return super.getRemoteMetadata(key, url);
} catch(VoldemortException e) {
- // Fix SNA-4227: When an error occurs during bootstrap, close the socket
+ // Fix SNA-4227: When an error occurs during bootstrap, close the
+ // socket
SocketDestination destination = new SocketDestination(url.getHost(),
url.getPort(),
getRequestFormatType());
@@ -37,6 +37,7 @@
import org.apache.log4j.Logger;
import voldemort.store.socket.SocketDestination;
+import voldemort.store.stats.ClientSocketStats;
import voldemort.utils.DaemonThreadFactory;
import voldemort.utils.SelectorManager;
import voldemort.utils.Time;
@@ -61,6 +62,7 @@
private final AtomicInteger counter = new AtomicInteger();
private final Map<SocketDestination, Long> lastClosedTimestamps;
private final Logger logger = Logger.getLogger(getClass());
+ private ClientSocketStats stats;
public ClientRequestExecutorFactory(int selectors,
int connectTimeoutMs,
@@ -94,6 +96,9 @@ public void destroy(SocketDestination dest, ClientRequestExecutor clientRequestE
throws Exception {
clientRequestExecutor.close();
int numDestroyed = destroyed.incrementAndGet();
+ if(stats != null) {
+ stats.connectionDestroy(dest);
+ }
if(logger.isDebugEnabled())
logger.debug("Destroyed socket " + numDestroyed + " connection to " + dest.getHost()
@@ -108,6 +113,9 @@ public void destroy(SocketDestination dest, ClientRequestExecutor clientRequestE
public ClientRequestExecutor create(SocketDestination dest) throws Exception {
int numCreated = created.incrementAndGet();
+ if(stats != null) {
+ stats.connectionCreate(dest);
+ }
if(logger.isDebugEnabled())
logger.debug("Creating socket " + numCreated + " for " + dest.getHost() + ":"
@@ -426,4 +434,8 @@ public void setLastClosedTimestamp(SocketDestination socketDestination) {
lastClosedTimestamps.put(socketDestination, System.nanoTime());
}
+ public void setStats(ClientSocketStats stats) {
+ this.stats = stats;
+ }
+
}
@@ -17,19 +17,17 @@
package voldemort.store.socket.clientrequest;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import voldemort.VoldemortException;
-import voldemort.annotations.jmx.JmxGetter;
-import voldemort.annotations.jmx.JmxManaged;
-import voldemort.annotations.jmx.JmxSetter;
import voldemort.client.protocol.RequestFormatType;
import voldemort.server.RequestRoutingType;
import voldemort.store.UnreachableStoreException;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketStore;
import voldemort.store.socket.SocketStoreFactory;
+import voldemort.store.stats.ClientSocketStats;
+import voldemort.store.stats.ClientSocketStatsJmx;
+import voldemort.utils.JmxUtils;
import voldemort.utils.Time;
import voldemort.utils.Utils;
import voldemort.utils.pool.KeyedResourcePool;
@@ -46,15 +44,11 @@
* terminated upon calling {@link #close()}.
*/
-@JmxManaged(description = "Voldemort socket pool.")
public class ClientRequestExecutorPool implements SocketStoreFactory {
- private final AtomicInteger monitoringInterval = new AtomicInteger(10000);
- private final AtomicInteger checkouts;
- private final AtomicLong waitNs;
- private final AtomicLong avgWaitNs;
private final KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private final ClientRequestExecutorFactory factory;
+ private ClientSocketStats stats;
public ClientRequestExecutorPool(int selectors,
int maxConnectionsPerNode,
@@ -73,9 +67,8 @@ public ClientRequestExecutorPool(int selectors,
socketBufferSize,
socketKeepAlive);
this.pool = new KeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory, config);
- this.checkouts = new AtomicInteger(0);
- this.waitNs = new AtomicLong(0);
- this.avgWaitNs = new AtomicLong(0);
+ this.stats = new ClientSocketStats(pool);
+ ((ClientRequestExecutorFactory) factory).setStats(stats);
}
public ClientRequestExecutorPool(int maxConnectionsPerNode,
@@ -105,6 +98,13 @@ public SocketStore create(String storeName,
requestRoutingType);
}
+ public void registerJmx() {
+ stats.enableJmx();
+ JmxUtils.registerMbean(new ClientSocketStatsJmx(stats),
+ JmxUtils.createObjectName("voldemort.store.socket.clientrequest",
+ "aggregated"));
+ }
+
/**
* Checkout a socket from the pool
*
@@ -117,7 +117,10 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
// time checkout
long start = System.nanoTime();
ClientRequestExecutor clientRequestExecutor = pool.checkout(destination);
- updateStats(System.nanoTime() - start);
+ long end = System.nanoTime();
+ if(stats != null) {
+ stats.recordCheckoutTimeUs(destination, (end - start) / Time.NS_PER_US);
+ }
return clientRequestExecutor;
} catch(Exception e) {
@@ -126,20 +129,6 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
}
}
- private void updateStats(long checkoutTimeNs) {
- long wait = waitNs.getAndAdd(checkoutTimeNs);
- int count = checkouts.getAndIncrement();
-
- // reset reporting inverval if we have used up the current interval
- int interval = this.monitoringInterval.get();
- if(count % interval == interval - 1) {
- // harmless race condition:
- waitNs.set(0);
- checkouts.set(0);
- avgWaitNs.set(wait / count);
- }
- }
-
/**
* Check the socket back into the pool.
*
@@ -168,36 +157,8 @@ public void close() {
pool.close();
}
- @JmxGetter(name = "socketsCreated", description = "The total number of sockets created by this pool.")
- public int getNumberSocketsCreated() {
- return this.factory.getNumberCreated();
- }
-
- @JmxGetter(name = "socketsDestroyed", description = "The total number of sockets destroyed by this pool.")
- public int getNumberSocketsDestroyed() {
- return this.factory.getNumberDestroyed();
- }
-
- @JmxGetter(name = "numberOfConnections", description = "The number of active connections.")
- public int getNumberOfActiveConnections() {
- return this.pool.getTotalResourceCount();
- }
-
- @JmxGetter(name = "numberOfIdleConnections", description = "The number of idle connections.")
- public int getNumberOfCheckedInConnections() {
- return this.pool.getCheckedInResourceCount();
- }
-
- @JmxGetter(name = "avgWaitTimeMs", description = "The avg. ms of wait time to acquire a connection.")
- public double getAvgWaitTimeMs() {
- return this.avgWaitNs.doubleValue() / Time.NS_PER_MS;
- }
-
- @JmxSetter(name = "monitoringInterval", description = "The number of checkouts over which performance statistics are calculated.")
- public void setMonitoringInterval(int count) {
- if(count <= 0)
- throw new IllegalArgumentException("Monitoring interval must be a positive number.");
- this.monitoringInterval.set(count);
+ public ClientSocketStats getStats() {
+ return stats;
}
}
Oops, something went wrong.

0 comments on commit 9bb1704

Please sign in to comment.