Skip to content

Commit

Permalink
fixes #639 (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinChenYan committed Jan 31, 2020
1 parent af7621c commit af05659
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 2 deletions.
Expand Up @@ -652,7 +652,7 @@ public void failed(IOException e) {
}
}


@Test
public String callApiAsync() throws Exception {
final Http2Client client = createClient();
final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -674,6 +674,7 @@ public String callApiAsync() throws Exception {
return reference.get().getAttachment(Http2Client.RESPONSE_BODY);
}

@Test
public ByteBuffer callApiWithByteBuffer() throws Exception {
final Http2Client client = createClient();
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
@@ -0,0 +1,95 @@
package com.networknt.metrics;

import io.dropwizard.metrics.*;
import io.dropwizard.metrics.influxdb.InfluxDbSender;
import io.dropwizard.metrics.influxdb.data.InfluxDbPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

public class JVMMetricsInfluxDbReporter extends ScheduledReporter {

private static final Logger logger = LoggerFactory.getLogger(JVMMetricsInfluxDbReporter.class);
private final InfluxDbSender influxDb;
private final MetricRegistry registry;
private final Map<String, String> tags;

public JVMMetricsInfluxDbReporter(final MetricRegistry registry, final InfluxDbSender influxDb, String name, MetricFilter filter, TimeUnit rateUnit,
TimeUnit durationUnit, Map<String, String> tags) {
super(registry, name, filter, rateUnit, durationUnit);
this.influxDb = influxDb;
this.registry = registry;
this.tags = tags;
}

@Override
public void report(final SortedMap<MetricName, Gauge> gauges, final SortedMap<MetricName, Counter> counters,
final SortedMap<MetricName, Histogram> histograms, final SortedMap<MetricName, Meter> meters, final SortedMap<MetricName, Timer> timers) {
final long now = System.currentTimeMillis();

JVMMetricsUtil.trackAllJVMMetrics(registry, tags);

if(logger.isDebugEnabled()) logger.debug("InfluxDbReporter report is called with counter size " + counters.size());
try {
influxDb.flush();

//Get gauges again from registry, since the gauges provided in the argument is OUTDATED (Previous collection)
for (Map.Entry<MetricName, Gauge> entry : registry.getGauges().entrySet()) {
reportGauge(entry.getKey(), entry.getValue(), now);
}

if (influxDb.hasSeriesData()) {
influxDb.writeData();
}
} catch (Exception e) {
logger.error("Unable to report to InfluxDB. Discarding data.", e);
}
}

private void reportGauge(MetricName name, Gauge<?> gauge, long now) {
final String value = format(gauge.getValue());
if(value != null) {
Map<String, String> apiTags = new HashMap<>(name.getTags());
String apiName = apiTags.remove("apiName");
Map<String, String> clientTags = new HashMap<>(name.getTags());
String clientId = clientTags.remove("clientId");

influxDb.appendPoints(new InfluxDbPoint(apiName + "." + name.getKey(), apiTags, now, value));
if(clientId != null) {
influxDb.appendPoints(new InfluxDbPoint(clientId + "." + name.getKey(), clientTags, now, value));
}
}
}

private String format(Object o) {
if (o instanceof Float) {
return format(((Float) o).doubleValue());
} else if (o instanceof Double) {
return format(((Double) o).doubleValue());
} else if (o instanceof Byte) {
return format(((Byte) o).longValue());
} else if (o instanceof Short) {
return format(((Short) o).longValue());
} else if (o instanceof Integer) {
return format(((Integer) o).longValue());
} else if (o instanceof Long) {
return format(((Long) o).longValue());
}
return null;
}
private String format(long n) {
return Long.toString(n);
}

private String format(double v) {
// the Carbon plaintext format is pretty underspecified, but it seems like it just wants
// US-formatted digits
return String.format(Locale.US, "%2.4f", v);
}

}
128 changes: 128 additions & 0 deletions metrics/src/main/java/com/networknt/metrics/JVMMetricsUtil.java
@@ -0,0 +1,128 @@
package com.networknt.metrics;

import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricName;
import io.dropwizard.metrics.MetricRegistry;
import io.dropwizard.metrics.MetricRegistry.MetricBuilder;

import javax.management.*;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.Map;

public class JVMMetricsUtil {

public static void trackAllJVMMetrics(final MetricRegistry registry, final Map<String, String> commonTags) {
//JVM Metrics
MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
track("mem.heap_mem", memBean.getHeapMemoryUsage(), registry, commonTags);
track("mem.nonheap_mem", memBean.getNonHeapMemoryUsage(), registry, commonTags);

double hmu = ((Long)memBean.getHeapMemoryUsage().getUsed()).doubleValue();
double hmc = ((Long)memBean.getHeapMemoryUsage().getMax()).doubleValue();
if (hmc == -1) {
hmc = ((Long)memBean.getHeapMemoryUsage().getCommitted()).doubleValue();
}
double nhmu = ((Long)memBean.getNonHeapMemoryUsage().getUsed()).doubleValue();
double nhmc = ((Long)memBean.getNonHeapMemoryUsage().getMax()).doubleValue();
if (nhmc == -1) {
nhmc = ((Long)memBean.getNonHeapMemoryUsage().getCommitted()).doubleValue();
}

track("mem.heap_usage", hmu / hmc, registry, commonTags);
track("mem.nonheap_usage", nhmu / nhmc, registry, commonTags);


MBeanServer beans = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName os = new ObjectName("java.lang:type=OperatingSystem");
Double sysCpuLoad = (Double)beans.getAttribute(os, "SystemCpuLoad");
Double processCpuLoad = (Double)beans.getAttribute(os, "ProcessCpuLoad");

double totalPMemory = ((Long)beans.getAttribute(os, "TotalPhysicalMemorySize")).doubleValue();
double freePMemory = ((Long)beans.getAttribute(os, "FreePhysicalMemorySize")).doubleValue();

track("os.sys_cpu_load", sysCpuLoad, registry, commonTags);
track("os.process_cpu_load", processCpuLoad, registry, commonTags);
track("os.mem_usage", (totalPMemory-freePMemory)/totalPMemory, registry, commonTags);
} catch (InstanceNotFoundException | AttributeNotFoundException | MalformedObjectNameException
| ReflectionException | MBeanException e) {
e.printStackTrace();
}

track("thread.count", ManagementFactory.getThreadMXBean().getThreadCount(), registry, commonTags);
}

private static void track(String name, MemoryUsage m, final MetricRegistry registry, final Map<String, String> commonTags) {
MetricName mName = MetricRegistry.name("jvm", name).tagged(commonTags);
registry.remove(mName.resolve("used"));
registry.getOrAdd(mName.resolve("used"), createGaugeMetricBuilder(m.getUsed()));
registry.remove(mName.resolve("init"));
registry.getOrAdd(mName.resolve("init"), createGaugeMetricBuilder(m.getInit()));
registry.remove(mName.resolve("max"));
registry.getOrAdd(mName.resolve("max"), createGaugeMetricBuilder(m.getMax()));
registry.remove(mName.resolve("committed"));
registry.getOrAdd(mName.resolve("committed"), createGaugeMetricBuilder(m.getCommitted()));
}
private static void track(String name, Long value, final MetricRegistry registry, final Map<String, String> commonTags) {
MetricName mName = MetricRegistry.name("jvm", name).tagged(commonTags);
registry.remove(mName);
registry.getOrAdd(mName, createGaugeMetricBuilder(value));
}
private static void track(String name, Double value, final MetricRegistry registry, final Map<String, String> commonTags) {
MetricName mName = MetricRegistry.name("jvm", name).tagged(commonTags);
registry.remove(mName);
registry.getOrAdd(mName, createGaugeMetricBuilder(value));
}

private static void track(String name, int value, final MetricRegistry registry, final Map<String, String> commonTags) {
MetricName mName = MetricRegistry.name("jvm", name).tagged(commonTags);
registry.remove(mName);
registry.getOrAdd(mName, createGaugeMetricBuilder(value));
}

private static MetricBuilder<Gauge<Long>> createGaugeMetricBuilder(long value){
return new MetricBuilder<Gauge<Long>>() {
@Override
public Gauge<Long> newMetric() {
return () -> Long.valueOf(value);
}

@Override
public boolean isInstance(Metric metric) {
return Gauge.class.isInstance(metric);
}
};
}

private static MetricBuilder<Gauge<Double>> createGaugeMetricBuilder(Double value){
return new MetricBuilder<Gauge<Double>>() {
@Override
public Gauge<Double> newMetric() {
return () -> value;
}

@Override
public boolean isInstance(Metric metric) {
return Gauge.class.isInstance(metric);
}
};
}

private static MetricBuilder<Gauge<Integer>> createGaugeMetricBuilder(int value){
return new MetricBuilder<Gauge<Integer>>() {
@Override
public Gauge<Integer> newMetric() {
return () -> Integer.valueOf(value);
}

@Override
public boolean isInstance(Metric metric) {
return Gauge.class.isInstance(metric);
}
};
}

}
10 changes: 9 additions & 1 deletion metrics/src/main/java/com/networknt/metrics/MetricsConfig.java
Expand Up @@ -26,7 +26,7 @@
*/
public class MetricsConfig {
boolean enabled;

boolean enableJVMMonitor;
String influxdbProtocol;
String influxdbHost;
int influxdbPort;
Expand All @@ -49,6 +49,14 @@ public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public boolean isEnableJVMMonitor() {
return enableJVMMonitor;
}

public void setEnableJVMMonitor(boolean enableJVMMonitor) {
this.enableJVMMonitor = enableJVMMonitor;
}

public String getInfluxdbHost() {
return influxdbHost;
}
Expand Down
18 changes: 18 additions & 0 deletions metrics/src/main/java/com/networknt/metrics/MetricsHandler.java
Expand Up @@ -76,6 +76,10 @@ public class MetricsHandler implements MiddlewareHandler {
.filter(MetricFilter.ALL)
.build(influxDb);
reporter.start(config.getReportInMinutes(), TimeUnit.MINUTES);
if (config.enableJVMMonitor) {
createJVMMetricsReporter(influxDb);
}

logger.info("metrics is enabled and reporter is started");
} catch (Exception e) {
// if there are any exception, chances are influxdb is not available. disable this handler.
Expand Down Expand Up @@ -166,4 +170,18 @@ private void incCounterForStatusCode(int statusCode, Map<String, String> commonT
}
}

private static void createJVMMetricsReporter(final InfluxDbSender influxDb) {
Map<String, String> commonTags = new HashMap<>();

commonTags.put("apiName", Server.config.getServiceId());
commonTags.put("environment", Server.config.getEnvironment());
InetAddress inetAddress = Util.getInetAddress();
// On Docker for Mac, inetAddress will be null as there is a bug.
commonTags.put("ipAddress", inetAddress == null ? "unknown" : inetAddress.getHostAddress());
commonTags.put("hostname", inetAddress == null ? "unknown" : inetAddress.getHostName()); // will be container id if in docker.

JVMMetricsInfluxDbReporter jvmReporter = new JVMMetricsInfluxDbReporter(new MetricRegistry(), influxDb, "jvmInfluxDb-reporter",
MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, commonTags);
jvmReporter.start(config.getReportInMinutes(), TimeUnit.MINUTES);
}
}
3 changes: 3 additions & 0 deletions metrics/src/main/resources/config/metrics.yml
Expand Up @@ -4,6 +4,9 @@
# If metrics handler is enabled or not
enabled: false

# If metrics handler is enable for JVM MBean or not
enableJVMMonitor: false

# influxdb protocal can be http, https
influxdbProtocol: http
# influxdb hostname
Expand Down

0 comments on commit af05659

Please sign in to comment.