Skip to content

Commit

Permalink
Merge pull request #5 from kernelsid/add_metrics
Browse files Browse the repository at this point in the history
Add metrics to track the reads/writes/exceptions on the trident memcached layer
  • Loading branch information
jasonjckn committed Jun 21, 2013
2 parents f43bbe5 + 381dd9d commit 95515c8
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/jvm/trident/memcached/MemcachedState.java
Expand Up @@ -3,6 +3,8 @@
import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.ReportedFailedException;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.metric.api.CountMetric;
import com.twitter.finagle.ApiException;
import com.twitter.finagle.ApplicationException;
import com.twitter.finagle.ChannelBufferUsageException;
Expand Down Expand Up @@ -122,6 +124,7 @@ public State makeState(Map conf, IMetricsContext context, int partitionIndex, in
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
s.registerMetrics(conf, context);
CachedMap c = new CachedMap(s, _opts.localCacheSize);
MapState ms;
if(_type == StateType.NON_TRANSACTIONAL) {
Expand Down Expand Up @@ -182,6 +185,9 @@ static String getHostPortWeightTuples(List<InetSocketAddress> endpoints) throws
private final Client _client;
private Options _opts;
private Serializer _ser;
CountMetric _mreads;
CountMetric _mwrites;
CountMetric _mexceptions;

public MemcachedState(Client client, Options opts, Serializer<T> ser) {
_client = client;
Expand Down Expand Up @@ -214,6 +220,7 @@ public List<T> multiGet(List<List<Object>> keys) {
}
}
}
_mreads.incrBy(ret.size());
return ret;
} catch(Exception e) {
checkMemcachedException(e);
Expand All @@ -238,13 +245,15 @@ public void multiPut(List<List<Object>> keys, List<T> vals) {
for(Future future: futures) {
future.get();
}
_mwrites.incrBy(futures.size());
} catch(Exception e) {
checkMemcachedException(e);
}
}


private void checkMemcachedException(Exception e) {
_mexceptions.incr();
if(e instanceof RequestException ||
e instanceof ChannelException ||
e instanceof ServiceException ||
Expand All @@ -258,6 +267,13 @@ private void checkMemcachedException(Exception e) {
}
}

private void registerMetrics(Map conf, IMetricsContext context) {
int bucketSize = (int)(conf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
_mreads = context.registerMetric("memcached/readCount", new CountMetric(), bucketSize);
_mwrites = context.registerMetric("memcached/writeCount", new CountMetric(), bucketSize);
_mexceptions = context.registerMetric("memcached/exceptionCount", new CountMetric(), bucketSize);
}

private String toSingleKey(List<Object> key) {
if(key.size()!=1) {
throw new RuntimeException("Memcached state does not support compound keys");
Expand Down

0 comments on commit 95515c8

Please sign in to comment.