Skip to content
This repository
Browse code

Initial check in for metrics 2.1.2.

  • Loading branch information...
commit 2ca808e1b0445e2183def681a6ed599a20c42796 1 parent c03b4ba
Sean Laurent authored
3  .gitignore
... ... @@ -0,0 +1,3 @@
  1 +target
  2 +*.iml
  3 +.idea
164 pom.xml
... ... @@ -0,0 +1,164 @@
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3 + <modelVersion>4.0.0</modelVersion>
  4 +
  5 + <groupId>studyblue</groupId>
  6 + <artifactId>metrics-statsd</artifactId>
  7 + <name>Metrics Statsd Support</name>
  8 + <version>1.0.0</version>
  9 + <packaging>jar</packaging>
  10 +
  11 + <properties>
  12 + <metrics.version>2.1.2</metrics.version>
  13 + <slf4j.version>1.6.4</slf4j.version>
  14 + </properties>
  15 +
  16 + <developers>
  17 + <developer>
  18 + <name>Sean Laurent</name>
  19 + <email>organicveggie@gmail.com</email>
  20 + <timezone>-6</timezone>
  21 + </developer>
  22 + </developers>
  23 +
  24 + <licenses>
  25 + <license>
  26 + <name>Apache License 2.0</name>
  27 + <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
  28 + <distribution>repo</distribution>
  29 + </license>
  30 + </licenses>
  31 +
  32 + <scm>
  33 + <connection>scm:git:git://github.com/organicveggie/metrics-statsd.git</connection>
  34 + <developerConnection>scm:git:git@github.com:organicveggie/metrics-statsd.git</developerConnection>
  35 + <url>http://github.com/organicveggie/metrics-statsd/</url>
  36 + </scm>
  37 +
  38 + <issueManagement>
  39 + <system>github</system>
  40 + <url>http://github.com/organicveggie/metrics-statsd/issues#issue/</url>
  41 + </issueManagement>
  42 +
  43 + <repositories>
  44 + <repository>
  45 + <id>central</id>
  46 + <url>http://oss.sonatype.org/content/repositories/releases</url>
  47 + <snapshots>
  48 + <enabled>false</enabled>
  49 + </snapshots>
  50 + </repository>
  51 + <repository>
  52 + <id>snapshots</id>
  53 + <url>http://oss.sonatype.org/content/repositories/snapshots</url>
  54 + <snapshots>
  55 + <enabled>true</enabled>
  56 + </snapshots>
  57 + </repository>
  58 + </repositories>
  59 +
  60 + <dependencies>
  61 + <dependency>
  62 + <groupId>com.yammer.metrics</groupId>
  63 + <artifactId>metrics-core</artifactId>
  64 + <version>${metrics.version}</version>
  65 + <type>jar</type>
  66 + <scope>compile</scope>
  67 + </dependency>
  68 + <dependency>
  69 + <groupId>org.slf4j</groupId>
  70 + <artifactId>slf4j-api</artifactId>
  71 + <version>${slf4j.version}</version>
  72 + </dependency>
  73 + <dependency>
  74 + <groupId>org.slf4j</groupId>
  75 + <artifactId>slf4j-jdk14</artifactId>
  76 + <version>${slf4j.version}</version>
  77 + <scope>test</scope>
  78 + </dependency>
  79 + <dependency>
  80 + <groupId>com.yammer.metrics</groupId>
  81 + <artifactId>metrics-core</artifactId>
  82 + <version>${metrics.version}</version>
  83 + <type>test-jar</type>
  84 + <scope>test</scope>
  85 + </dependency>
  86 + <dependency>
  87 + <groupId>junit</groupId>
  88 + <artifactId>junit-dep</artifactId>
  89 + <version>4.10</version>
  90 + <scope>test</scope>
  91 + </dependency>
  92 + <dependency>
  93 + <groupId>org.mockito</groupId>
  94 + <artifactId>mockito-all</artifactId>
  95 + <version>1.9.0</version>
  96 + <scope>test</scope>
  97 + </dependency>
  98 + </dependencies>
  99 +
  100 + <build>
  101 + <plugins>
  102 + <plugin>
  103 + <groupId>org.apache.maven.plugins</groupId>
  104 + <artifactId>maven-compiler-plugin</artifactId>
  105 + <version>2.3.2</version>
  106 + <configuration>
  107 + <source>1.6</source>
  108 + <target>1.6</target>
  109 + </configuration>
  110 + </plugin>
  111 + <plugin>
  112 + <groupId>org.apache.felix</groupId>
  113 + <artifactId>maven-bundle-plugin</artifactId>
  114 + <version>2.3.7</version>
  115 + <extensions>true</extensions>
  116 + </plugin>
  117 + <plugin>
  118 + <groupId>org.apache.maven.plugins</groupId>
  119 + <artifactId>maven-surefire-plugin</artifactId>
  120 + <version>2.8.1</version>
  121 + <configuration>
  122 + <parallel>classes</parallel>
  123 + </configuration>
  124 + </plugin>
  125 + <plugin>
  126 + <groupId>org.apache.maven.plugins</groupId>
  127 + <artifactId>maven-source-plugin</artifactId>
  128 + <version>2.1.2</version>
  129 + <executions>
  130 + <execution>
  131 + <id>attach-sources</id>
  132 + <goals>
  133 + <goal>jar</goal>
  134 + </goals>
  135 + </execution>
  136 + </executions>
  137 + </plugin>
  138 + <plugin>
  139 + <groupId>org.apache.maven.plugins</groupId>
  140 + <artifactId>maven-javadoc-plugin</artifactId>
  141 + <version>2.8.1</version>
  142 + <executions>
  143 + <execution>
  144 + <id>attach-javadocs</id>
  145 + <goals>
  146 + <goal>jar</goal>
  147 + </goals>
  148 + </execution>
  149 + </executions>
  150 + </plugin>
  151 + <plugin>
  152 + <groupId>org.apache.maven.plugins</groupId>
  153 + <artifactId>maven-release-plugin</artifactId>
  154 + <version>2.2.1</version>
  155 + <configuration>
  156 + <autoVersionSubmodules>true</autoVersionSubmodules>
  157 + <mavenExecutorId>forked-path</mavenExecutorId>
  158 + <tagNameFormat>v@{project.version}</tagNameFormat>
  159 + <preparationGoals>clean test</preparationGoals>
  160 + </configuration>
  161 + </plugin>
  162 + </plugins>
  163 + </build>
  164 +</project>
336 src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java
... ... @@ -0,0 +1,336 @@
  1 +package com.bealetech.metrics.reporting;
  2 +
  3 +import com.yammer.metrics.Metrics;
  4 +import com.yammer.metrics.core.*;
  5 +import com.yammer.metrics.reporting.AbstractPollingReporter;
  6 +import com.yammer.metrics.stats.Snapshot;
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +
  10 +import java.io.*;
  11 +import java.net.DatagramPacket;
  12 +import java.net.DatagramSocket;
  13 +import java.net.InetSocketAddress;
  14 +import java.util.Locale;
  15 +import java.util.Map;
  16 +import java.util.SortedMap;
  17 +import java.util.concurrent.TimeUnit;
  18 +
  19 +public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor<Long> {
  20 +
  21 + public static enum StatType { COUNTER, TIMER, GAUGE }
  22 +
  23 + private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);
  24 +
  25 + protected final String prefix;
  26 + protected final MetricPredicate predicate;
  27 + protected final Locale locale = Locale.US;
  28 + protected final Clock clock;
  29 + protected final UDPSocketProvider socketProvider;
  30 + protected final VirtualMachineMetrics vm;
  31 + protected Writer writer;
  32 + protected ByteArrayOutputStream outputData;
  33 +
  34 + private boolean printVMMetrics = true;
  35 +
  36 + public interface UDPSocketProvider {
  37 + DatagramSocket get() throws Exception;
  38 + DatagramPacket newPacket(ByteArrayOutputStream out);
  39 + }
  40 +
  41 + public StatsdReporter(String host, int port) throws IOException {
  42 + this(Metrics.defaultRegistry(), host, port, null);
  43 + }
  44 +
  45 + public StatsdReporter(String host, int port, String prefix) throws IOException {
  46 + this(Metrics.defaultRegistry(), host, port, prefix);
  47 + }
  48 +
  49 + public StatsdReporter(MetricsRegistry metricsRegistry, String host, int port) throws IOException {
  50 + this(metricsRegistry, host, port, null);
  51 + }
  52 +
  53 + public StatsdReporter(MetricsRegistry metricsRegistry, String host, int port, String prefix) throws IOException {
  54 + this(metricsRegistry,
  55 + prefix,
  56 + MetricPredicate.ALL,
  57 + new DefaultSocketProvider(host, port),
  58 + Clock.defaultClock());
  59 + }
  60 +
  61 + public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock) throws IOException {
  62 + this(metricsRegistry, prefix, predicate, socketProvider, clock, VirtualMachineMetrics.getInstance());
  63 + }
  64 +
  65 + public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm) throws IOException {
  66 + this(metricsRegistry, prefix, predicate, socketProvider, clock, vm, "graphite-reporter");
  67 + }
  68 +
  69 + public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm, String name) throws IOException {
  70 + super(metricsRegistry, name);
  71 +
  72 + this.socketProvider = socketProvider;
  73 + this.vm = vm;
  74 +
  75 + this.clock = clock;
  76 +
  77 + if (prefix != null) {
  78 + // Pre-append the "." so that we don't need to make anything conditional later.
  79 + this.prefix = prefix + ".";
  80 + } else {
  81 + this.prefix = "";
  82 + }
  83 + this.predicate = predicate;
  84 + this.outputData = new ByteArrayOutputStream();
  85 + }
  86 +
  87 + public boolean isPrintVMMetrics() {
  88 + return printVMMetrics;
  89 + }
  90 +
  91 + public void setPrintVMMetrics(boolean printVMMetrics) {
  92 + this.printVMMetrics = printVMMetrics;
  93 + }
  94 +
  95 + @Override
  96 + public void run() {
  97 + DatagramSocket socket = null;
  98 + try {
  99 + socket = this.socketProvider.get();
  100 + outputData.reset();
  101 + writer = new BufferedWriter(new OutputStreamWriter(this.outputData));
  102 +
  103 + final long epoch = clock.time() / 1000;
  104 + if (this.printVMMetrics) {
  105 + printVmMetrics(epoch);
  106 + }
  107 + printRegularMetrics(epoch);
  108 +
  109 + // Send UDP data
  110 + writer.flush();
  111 + DatagramPacket packet = this.socketProvider.newPacket(outputData);
  112 + packet.setData(outputData.toByteArray());
  113 + socket.send(packet);
  114 + } catch (Exception e) {
  115 + if (LOG.isDebugEnabled()) {
  116 + LOG.debug("Error writing to Graphite", e);
  117 + } else {
  118 + LOG.warn("Error writing to Graphite: {}", e.getMessage());
  119 + }
  120 + if (writer != null) {
  121 + try {
  122 + writer.flush();
  123 + } catch (IOException e1) {
  124 + LOG.error("Error while flushing writer:", e1);
  125 + }
  126 + }
  127 + } finally {
  128 + if (socket != null) {
  129 + socket.close();
  130 + }
  131 + writer = null;
  132 + }
  133 + }
  134 +
  135 + protected void printVmMetrics(long epoch) {
  136 + // Memory
  137 + sendFloat("jvm.memory.totalInit", StatType.GAUGE, vm.totalInit());
  138 + sendFloat("jvm.memory.totalUsed", StatType.GAUGE, vm.totalUsed());
  139 + sendFloat("jvm.memory.totalMax", StatType.GAUGE, vm.totalMax());
  140 + sendFloat("jvm.memory.totalCommitted", StatType.GAUGE, vm.totalCommitted());
  141 +
  142 + sendFloat("jvm.memory.heapInit", StatType.GAUGE, vm.heapInit());
  143 + sendFloat("jvm.memory.heapUsed", StatType.GAUGE, vm.heapUsed());
  144 + sendFloat("jvm.memory.heapMax", StatType.GAUGE, vm.heapMax());
  145 + sendFloat("jvm.memory.heapCommitted", StatType.GAUGE, vm.heapCommitted());
  146 +
  147 + sendFloat("jvm.memory.heapUsage", StatType.GAUGE, vm.heapUsage());
  148 + sendFloat("jvm.memory.nonHeapUsage", StatType.GAUGE, vm.nonHeapUsage());
  149 +
  150 + for (Map.Entry<String, Double> pool : vm.memoryPoolUsage().entrySet()) {
  151 + sendFloat("jvm.memory.memory_pool_usages." + sanitizeString(pool.getKey()), StatType.GAUGE, pool.getValue());
  152 + }
  153 +
  154 + // Buffer Pool
  155 + final Map<String, VirtualMachineMetrics.BufferPoolStats> bufferPoolStats = vm.getBufferPoolStats();
  156 + if (!bufferPoolStats.isEmpty()) {
  157 + sendFloat("jvm.buffers.direct.count", StatType.GAUGE, bufferPoolStats.get("direct").getCount());
  158 + sendFloat("jvm.buffers.direct.memoryUsed", StatType.GAUGE, bufferPoolStats.get("direct").getMemoryUsed());
  159 + sendFloat("jvm.buffers.direct.totalCapacity", StatType.GAUGE, bufferPoolStats.get("direct").getTotalCapacity());
  160 +
  161 + sendFloat("jvm.buffers.mapped.count", StatType.GAUGE, bufferPoolStats.get("mapped").getCount());
  162 + sendFloat("jvm.buffers.mapped.memoryUsed", StatType.GAUGE, bufferPoolStats.get("mapped").getMemoryUsed());
  163 + sendFloat("jvm.buffers.mapped.totalCapacity", StatType.GAUGE, bufferPoolStats.get("mapped").getTotalCapacity());
  164 + }
  165 +
  166 + sendInt("jvm.daemon_thread_count", StatType.GAUGE, vm.daemonThreadCount());
  167 + sendInt("jvm.thread_count", StatType.GAUGE, vm.threadCount());
  168 + sendInt("jvm.uptime", StatType.GAUGE, vm.uptime());
  169 + sendFloat("jvm.fd_usage", StatType.GAUGE, vm.fileDescriptorUsage());
  170 +
  171 + for (Map.Entry<Thread.State, Double> entry : vm.threadStatePercentages().entrySet()) {
  172 + sendFloat("jvm.thread-states." + entry.getKey().toString().toLowerCase(), StatType.GAUGE, entry.getValue());
  173 + }
  174 +
  175 + for (Map.Entry<String, VirtualMachineMetrics.GarbageCollectorStats> entry : vm.garbageCollectors().entrySet()) {
  176 + final String name = "jvm.gc." + sanitizeString(entry.getKey());
  177 + sendInt(name + ".time", StatType.GAUGE, entry.getValue().getTime(TimeUnit.MILLISECONDS));
  178 + sendInt(name + ".runs", StatType.GAUGE, entry.getValue().getRuns());
  179 + }
  180 + }
  181 +
  182 + protected void printRegularMetrics(long epoch) {
  183 + for (Map.Entry<String,SortedMap<MetricName,Metric>> entry : getMetricsRegistry().groupedMetrics(predicate).entrySet()) {
  184 + for (Map.Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {
  185 + final Metric metric = subEntry.getValue();
  186 + if (metric != null) {
  187 + try {
  188 + metric.processWith(this, subEntry.getKey(), epoch);
  189 + } catch (Exception ignored) {
  190 + LOG.error("Error printing regular metrics:", ignored);
  191 + }
  192 + }
  193 + }
  194 + }
  195 + }
  196 +
  197 + @Override
  198 + public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception {
  199 + final String sanitizedName = sanitizeName(name);
  200 + sendInt(sanitizedName + ".count", StatType.GAUGE, meter.count());
  201 + sendFloat(sanitizedName + ".meanRate", StatType.TIMER, meter.meanRate());
  202 + sendFloat(sanitizedName + ".1MinuteRate", StatType.TIMER, meter.oneMinuteRate());
  203 + sendFloat(sanitizedName + ".5MinuteRate", StatType.TIMER, meter.fiveMinuteRate());
  204 + sendFloat(sanitizedName + ".15MinuteRate", StatType.TIMER, meter.fifteenMinuteRate());
  205 + }
  206 +
  207 + @Override
  208 + public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception {
  209 + sendInt(sanitizeName(name) + ".count", StatType.GAUGE, counter.count());
  210 + }
  211 +
  212 + @Override
  213 + public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception {
  214 + final String sanitizedName = sanitizeName(name);
  215 + sendSummarizable(sanitizedName, histogram);
  216 + sendSampling(sanitizedName, histogram);
  217 + }
  218 +
  219 + @Override
  220 + public void processTimer(MetricName name, Timer timer, Long epoch) throws Exception {
  221 + processMeter(name, timer, epoch);
  222 + final String sanitizedName = sanitizeName(name);
  223 + sendSummarizable(sanitizedName, timer);
  224 + sendSampling(sanitizedName, timer);
  225 + }
  226 +
  227 + @Override
  228 + public void processGauge(MetricName name, Gauge<?> gauge, Long epoch) throws Exception {
  229 + sendObj(sanitizeName(name) + ".count", StatType.GAUGE, gauge.value());
  230 + }
  231 +
  232 + protected void sendSummarizable(String sanitizedName, Summarizable metric) throws IOException {
  233 + sendFloat(sanitizedName + ".min", StatType.TIMER, metric.min());
  234 + sendFloat(sanitizedName + ".max", StatType.TIMER, metric.max());
  235 + sendFloat(sanitizedName + ".mean", StatType.TIMER, metric.mean());
  236 + sendFloat(sanitizedName + ".stddev", StatType.TIMER, metric.stdDev());
  237 + }
  238 +
  239 + protected void sendSampling(String sanitizedName, Sampling metric) throws IOException {
  240 + final Snapshot snapshot = metric.getSnapshot();
  241 + sendFloat(sanitizedName + ".median", StatType.TIMER, snapshot.getMedian());
  242 + sendFloat(sanitizedName + ".75percentile", StatType.TIMER, snapshot.get75thPercentile());
  243 + sendFloat(sanitizedName + ".95percentile", StatType.TIMER, snapshot.get95thPercentile());
  244 + sendFloat(sanitizedName + ".98percentile", StatType.TIMER, snapshot.get98thPercentile());
  245 + sendFloat(sanitizedName + ".99percentile", StatType.TIMER, snapshot.get99thPercentile());
  246 + sendFloat(sanitizedName + ".999percentile", StatType.TIMER, snapshot.get999thPercentile());
  247 + }
  248 +
  249 +
  250 + protected void sendInt(String name, StatType statType, long value) {
  251 + sendData(name, String.format(locale, "%d", value), statType);
  252 + }
  253 +
  254 + protected void sendFloat(String name, StatType statType, double value) {
  255 + sendData(name, String.format(locale, "%2.2f", value), statType);
  256 + }
  257 +
  258 + protected void sendObj(String name, StatType statType, Object value) {
  259 + sendData(name, String.format(locale, "%s", value), statType);
  260 + }
  261 +
  262 + protected String sanitizeName(MetricName name) {
  263 + final StringBuilder sb = new StringBuilder()
  264 + .append(name.getGroup())
  265 + .append('.')
  266 + .append(name.getType())
  267 + .append('.');
  268 + if (name.hasScope()) {
  269 + sb.append(name.getScope())
  270 + .append('.');
  271 + }
  272 + return sb.append(name.getName()).toString();
  273 + }
  274 +
  275 + protected String sanitizeString(String s) {
  276 + return s.replace(' ', '-');
  277 + }
  278 +
  279 + protected void sendData(String name, String value, StatType statType) {
  280 + String statTypeStr = "";
  281 + switch (statType) {
  282 + case COUNTER:
  283 + statTypeStr = "c";
  284 + break;
  285 + case GAUGE:
  286 + statTypeStr = "g";
  287 + break;
  288 + case TIMER:
  289 + statTypeStr = "ms";
  290 + break;
  291 + }
  292 +
  293 + try {
  294 + if (!prefix.isEmpty()) {
  295 + writer.write(prefix);
  296 + }
  297 + writer.write(sanitizeString(name));
  298 + writer.write(":");
  299 + writer.write(value);
  300 + writer.write("|");
  301 + writer.write(statTypeStr);
  302 + writer.write('\n');
  303 + writer.flush();
  304 + } catch (IOException e) {
  305 + LOG.error("Error sending to Graphite:", e);
  306 + }
  307 + }
  308 +
  309 + public static class DefaultSocketProvider implements UDPSocketProvider {
  310 +
  311 + private final String host;
  312 + private final int port;
  313 +
  314 + public DefaultSocketProvider(String host, int port) {
  315 + this.host = host;
  316 + this.port = port;
  317 + }
  318 +
  319 + @Override
  320 + public DatagramSocket get() throws Exception {
  321 + return new DatagramSocket(new InetSocketAddress(this.host, this.port));
  322 + }
  323 +
  324 + @Override
  325 + public DatagramPacket newPacket(ByteArrayOutputStream out) {
  326 + byte[] dataBuffer;
  327 + if (out != null) {
  328 + dataBuffer = out.toByteArray();
  329 + }
  330 + else {
  331 + dataBuffer = new byte[8192];
  332 + }
  333 + return new DatagramPacket(dataBuffer, dataBuffer.length);
  334 + }
  335 + }
  336 +}
311 src/test/java/com/bealetech/metrics/reporting/StatsdReporterTest.java
... ... @@ -0,0 +1,311 @@
  1 +package com.bealetech.metrics.reporting;
  2 +
  3 +import com.yammer.metrics.core.*;
  4 +import com.yammer.metrics.reporting.AbstractPollingReporter;
  5 +import com.yammer.metrics.stats.Snapshot;
  6 +import org.junit.Before;
  7 +import org.junit.Test;
  8 +import org.mockito.invocation.InvocationOnMock;
  9 +import org.mockito.stubbing.Answer;
  10 +import org.mockito.stubbing.Stubber;
  11 +
  12 +import java.io.ByteArrayOutputStream;
  13 +import java.net.DatagramPacket;
  14 +import java.net.DatagramSocket;
  15 +import java.util.Arrays;
  16 +import java.util.Random;
  17 +import java.util.concurrent.Callable;
  18 +import java.util.concurrent.TimeUnit;
  19 +
  20 +import static org.junit.Assert.assertEquals;
  21 +import static org.mockito.Matchers.any;
  22 +import static org.mockito.Mockito.*;
  23 +
  24 +public class StatsdReporterTest {
  25 +
  26 + protected final Clock clock = mock(Clock.class);
  27 + protected AbstractPollingReporter reporter;
  28 + protected TestMetricsRegistry registry;
  29 + protected DatagramPacket packet;
  30 +
  31 + protected static class TestMetricsRegistry extends MetricsRegistry {
  32 + public <T extends Metric> T add(MetricName name, T metric) {
  33 + return getOrAdd(name, metric);
  34 + }
  35 + }
  36 +
  37 + @Before
  38 + public void init() throws Exception {
  39 + when(clock.tick()).thenReturn(1234L);
  40 + when(clock.time()).thenReturn(5678L);
  41 + registry = new TestMetricsRegistry();
  42 + byte[] data = new byte[65536];
  43 + packet = new DatagramPacket(data, data.length);
  44 + reporter = createReporter(registry, clock);
  45 + }
  46 +
  47 + protected AbstractPollingReporter createReporter(MetricsRegistry registry, Clock clock) throws Exception {
  48 + final DatagramSocket socket = mock(DatagramSocket.class);
  49 + final StatsdReporter.UDPSocketProvider provider = mock(StatsdReporter.UDPSocketProvider.class);
  50 + when(provider.get()).thenReturn(socket);
  51 + when(provider.newPacket(any(ByteArrayOutputStream.class))).thenReturn(packet);
  52 +
  53 + final StatsdReporter reporter = new StatsdReporter(registry,
  54 + "prefix",
  55 + MetricPredicate.ALL,
  56 + provider,
  57 + clock);
  58 + reporter.setPrintVMMetrics(false);
  59 + return reporter;
  60 + }
  61 +
  62 + protected <T extends Metric> void assertReporterOutput(Callable<T> action, String... expected) throws Exception {
  63 + // Invoke the callable to trigger (ie, mark()/inc()/etc) and return the metric
  64 + final T metric = action.call();
  65 + try {
  66 + // Add the metric to the registry, run the reporter and flush the result
  67 + registry.add(new MetricName(Object.class, "metric"), metric);
  68 + reporter.run();
  69 +
  70 + String packetData = new String(packet.getData());
  71 + final String[] lines = packetData.split("\r?\n|\r");
  72 + // Assertions: first check that the line count matches then compare line by line ignoring leading and trailing whitespace
  73 + assertEquals("Line count mismatch, was:\n" + Arrays.toString(lines) + "\nexpected:\n" + Arrays
  74 + .toString(expected) + "\n", expected.length,
  75 + lines.length);
  76 + for (int i = 0; i < lines.length; i++) {
  77 + if (!expected[i].trim().equals(lines[i].trim())) {
  78 + System.err.println("Failure comparing line " + (1 + i));
  79 + System.err.println("Was: '" + lines[i] + "'");
  80 + System.err.println("Expected: '" + expected[i] + "'\n");
  81 + }
  82 + assertEquals(expected[i].trim(), lines[i].trim());
  83 + }
  84 + } finally {
  85 + reporter.shutdown();
  86 + }
  87 + }
  88 +
  89 + public String[] expectedGaugeResult(String value) {
  90 + return new String[]{String.format("prefix.java.lang.Object.metric.count:%s|g", value)};
  91 + }
  92 +
  93 + public String[] expectedTimerResult() {
  94 + return new String[]{
  95 + "prefix.java.lang.Object.metric.count:1|g",
  96 + "prefix.java.lang.Object.metric.meanRate:2.00|ms",
  97 + "prefix.java.lang.Object.metric.1MinuteRate:1.00|ms",
  98 + "prefix.java.lang.Object.metric.5MinuteRate:5.00|ms",
  99 + "prefix.java.lang.Object.metric.15MinuteRate:15.00|ms",
  100 + "prefix.java.lang.Object.metric.min:1.00|ms",
  101 + "prefix.java.lang.Object.metric.max:3.00|ms",
  102 + "prefix.java.lang.Object.metric.mean:2.00|ms",
  103 + "prefix.java.lang.Object.metric.stddev:1.50|ms",
  104 + "prefix.java.lang.Object.metric.median:0.50|ms",
  105 + "prefix.java.lang.Object.metric.75percentile:0.75|ms",
  106 + "prefix.java.lang.Object.metric.95percentile:0.95|ms",
  107 + "prefix.java.lang.Object.metric.98percentile:0.98|ms",
  108 + "prefix.java.lang.Object.metric.99percentile:0.99|ms",
  109 + "prefix.java.lang.Object.metric.999percentile:1.00|ms"
  110 + };
  111 + }
  112 +
  113 + public String[] expectedMeterResult() {
  114 + return new String[]{
  115 + "prefix.java.lang.Object.metric.count:1|g",
  116 + "prefix.java.lang.Object.metric.meanRate:2.00|ms",
  117 + "prefix.java.lang.Object.metric.1MinuteRate:1.00|ms",
  118 + "prefix.java.lang.Object.metric.5MinuteRate:5.00|ms",
  119 + "prefix.java.lang.Object.metric.15MinuteRate:15.00|ms",
  120 + };
  121 + }
  122 +
  123 + public String[] expectedHistogramResult() {
  124 + return new String[]{
  125 + "prefix.java.lang.Object.metric.min:1.00|ms",
  126 + "prefix.java.lang.Object.metric.max:3.00|ms",
  127 + "prefix.java.lang.Object.metric.mean:2.00|ms",
  128 + "prefix.java.lang.Object.metric.stddev:1.50|ms",
  129 + "prefix.java.lang.Object.metric.median:0.50|ms",
  130 + "prefix.java.lang.Object.metric.75percentile:0.75|ms",
  131 + "prefix.java.lang.Object.metric.95percentile:0.95|ms",
  132 + "prefix.java.lang.Object.metric.98percentile:0.98|ms",
  133 + "prefix.java.lang.Object.metric.99percentile:0.99|ms",
  134 + "prefix.java.lang.Object.metric.999percentile:1.00|ms"
  135 + };
  136 + }
  137 +
  138 + public String[] expectedCounterResult(long count) {
  139 + return new String[]{
  140 + String.format("prefix.java.lang.Object.metric.count:%d|g", count)
  141 + };
  142 + }
  143 +
  144 + @Test
  145 + public final void counter() throws Exception {
  146 + final long count = new Random().nextInt(Integer.MAX_VALUE);
  147 + assertReporterOutput(
  148 + new Callable<Counter>() {
  149 + @Override
  150 + public Counter call() throws Exception {
  151 + return createCounter(count);
  152 + }
  153 + },
  154 + expectedCounterResult(count));
  155 + }
  156 +
  157 + @Test
  158 + public final void histogram() throws Exception {
  159 + assertReporterOutput(
  160 + new Callable<Histogram>() {
  161 + @Override
  162 + public Histogram call() throws Exception {
  163 + return createHistogram();
  164 + }
  165 + },
  166 + expectedHistogramResult());
  167 + }
  168 +
  169 + @Test
  170 + public final void meter() throws Exception {
  171 + assertReporterOutput(
  172 + new Callable<Meter>() {
  173 + @Override
  174 + public Meter call() throws Exception {
  175 + return createMeter();
  176 + }
  177 + },
  178 + expectedMeterResult());
  179 + }
  180 +
  181 + @Test
  182 + public final void timer() throws Exception {
  183 + assertReporterOutput(
  184 + new Callable<Timer>() {
  185 + @Override
  186 + public Timer call() throws Exception {
  187 + return createTimer();
  188 + }
  189 + },
  190 + expectedTimerResult());
  191 + }
  192 +
  193 + @Test
  194 + public final void gauge() throws Exception {
  195 + final String value = "gaugeValue";
  196 + assertReporterOutput(
  197 + new Callable<Gauge<String>>() {
  198 + @Override
  199 + public Gauge<String> call() throws Exception {
  200 + return createGauge();
  201 + }
  202 + },
  203 + expectedGaugeResult(value));
  204 + }
  205 +
  206 + static Counter createCounter(long count) throws Exception {
  207 + final Counter mock = mock(Counter.class);
  208 + when(mock.count()).thenReturn(count);
  209 + return configureMatcher(mock, doAnswer(new MetricsProcessorAction() {
  210 + @Override
  211 + void delegateToProcessor(MetricProcessor<Object> processor, MetricName name, Object context) throws Exception {
  212 + processor.processCounter(name, mock, context);
  213 + }
  214 + }));
  215 + }
  216 +
  217 + static Histogram createHistogram() throws Exception {
  218 + final Histogram mock = mock(Histogram.class);
  219 + setupSummarizableMock(mock);
  220 + setupSamplingMock(mock);
  221 + return configureMatcher(mock, doAnswer(new MetricsProcessorAction() {
  222 + @Override
  223 + void delegateToProcessor(MetricProcessor<Object> processor, MetricName name, Object context) throws Exception {
  224 + processor.processHistogram(name, mock, context);
  225 + }
  226 + }));
  227 + }
  228 +
  229 +
  230 + static Gauge<String> createGauge() throws Exception {
  231 + @SuppressWarnings("unchecked")
  232 + final Gauge<String> mock = mock(Gauge.class);
  233 + when(mock.value()).thenReturn("gaugeValue");
  234 + return configureMatcher(mock, doAnswer(new MetricsProcessorAction() {
  235 + @Override
  236 + void delegateToProcessor(MetricProcessor<Object> processor, MetricName name, Object context) throws Exception {
  237 + processor.processGauge(name, mock, context);
  238 + }
  239 + }));
  240 + }
  241 +
  242 +
  243 + static Timer createTimer() throws Exception {
  244 + final Timer mock = mock(Timer.class);
  245 + when(mock.durationUnit()).thenReturn(TimeUnit.MILLISECONDS);
  246 + setupSummarizableMock(mock);
  247 + setupMeteredMock(mock);
  248 + setupSamplingMock(mock);
  249 + return configureMatcher(mock, doAnswer(new MetricsProcessorAction() {
  250 + @Override
  251 + void delegateToProcessor(MetricProcessor<Object> processor, MetricName name, Object context) throws Exception {
  252 + processor.processTimer(name, mock, context);
  253 + }
  254 + }));
  255 + }
  256 +
  257 + static Meter createMeter() throws Exception {
  258 + final Meter mock = mock(Meter.class);
  259 + setupMeteredMock(mock);
  260 + return configureMatcher(mock, doAnswer(new MetricsProcessorAction() {
  261 + @Override
  262 + void delegateToProcessor(MetricProcessor<Object> processor, MetricName name, Object context) throws Exception {
  263 + processor.processMeter(name, mock, context);
  264 + }
  265 + }));
  266 + }
  267 +
  268 + @SuppressWarnings("unchecked")
  269 + static <T extends Metric> T configureMatcher(T mock, Stubber stub) throws Exception {
  270 + stub.when(mock).processWith(any(MetricProcessor.class), any(MetricName.class), any());
  271 + return mock;
  272 + }
  273 +
  274 + static abstract class MetricsProcessorAction implements Answer<Object> {
  275 + @Override
  276 + public Object answer(InvocationOnMock invocation) throws Throwable {
  277 + @SuppressWarnings("unchecked")
  278 + final MetricProcessor<Object> processor = (MetricProcessor<Object>) invocation.getArguments()[0];
  279 + final MetricName name = (MetricName) invocation.getArguments()[1];
  280 + final Object context = invocation.getArguments()[2];
  281 + delegateToProcessor(processor, name, context);
  282 + return null;
  283 + }
  284 +
  285 + abstract void delegateToProcessor(MetricProcessor<Object> processor, MetricName name, Object context) throws Exception;
  286 + }
  287 +
  288 + static void setupSummarizableMock(Summarizable summarizable) {
  289 + when(summarizable.min()).thenReturn(1d);
  290 + when(summarizable.max()).thenReturn(3d);
  291 + when(summarizable.mean()).thenReturn(2d);
  292 + when(summarizable.stdDev()).thenReturn(1.5d);
  293 + }
  294 +
  295 + static void setupMeteredMock(Metered metered) {
  296 + when(metered.count()).thenReturn(1L);
  297 + when(metered.oneMinuteRate()).thenReturn(1d);
  298 + when(metered.fiveMinuteRate()).thenReturn(5d);
  299 + when(metered.fifteenMinuteRate()).thenReturn(15d);
  300 + when(metered.meanRate()).thenReturn(2d);
  301 + when(metered.eventType()).thenReturn("eventType");
  302 + when(metered.rateUnit()).thenReturn(TimeUnit.SECONDS);
  303 + }
  304 +
  305 + static void setupSamplingMock(Sampling sampling) {
  306 + final double[] values = new double[1000];
  307 + for (int i = 0; i < values.length; i++) {
  308 + values[i] = i / 1000.0;
  309 + }
  310 + when(sampling.getSnapshot()).thenReturn(new Snapshot(values));
  311 + }}

0 comments on commit 2ca808e

Please sign in to comment.
Something went wrong with that request. Please try again.