Skip to content

Commit

Permalink
Cache DD host tags even if no metrics are present (#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbev committed Mar 11, 2020
1 parent bb3edac commit 89e40f4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,16 +392,12 @@ private boolean reportCheck(final JsonNode check,

private boolean reportSystemMetrics(final JsonNode metrics,
@Nullable final AtomicInteger pointCounter) {
if (metrics == null || !metrics.isObject() || !metrics.has("collection_timestamp")) {
pointHandler.reject((ReportPoint) null,
"WF-300: Payload missing 'collection_timestamp' field");
return false;
}
long timestamp = metrics.get("collection_timestamp").asLong() * 1000;
if (!metrics.has("internalHostname")) {
if (metrics == null || !metrics.isObject() || !metrics.has("internalHostname")) {
pointHandler.reject((ReportPoint) null, "WF-300: Payload missing 'internalHostname' field");
return false;
}

// Some /api/v1/intake requests only contain host-tag metadata so process it first
String hostName = metrics.get("internalHostname").textValue().toLowerCase();
Map<String, String> systemTags = new HashMap<>();
if (metrics.has("host-tags") && metrics.get("host-tags").get("system") != null) {
Expand All @@ -416,55 +412,59 @@ private boolean reportSystemMetrics(final JsonNode metrics,
}
}

// Report "system.io." metrics
JsonNode ioStats = metrics.get("ioStats");
if (ioStats != null && ioStats.isObject()) {
ioStats.fields().forEachRemaining(entry -> {
Map<String, String> deviceTags = ImmutableMap.<String, String>builder().
putAll(systemTags).
put("device", entry.getKey()).
build();
if (entry.getValue() != null && entry.getValue().isObject()) {
entry.getValue().fields().forEachRemaining(metricEntry -> {
String metric = "system.io." + metricEntry.getKey().replace('%', ' ').
replace('/', '_').trim();
reportValue(metric, hostName, deviceTags, metricEntry.getValue(), timestamp,
pointCounter);
});
if (metrics.has("collection_timestamp")) {
long timestamp = metrics.get("collection_timestamp").asLong() * 1000;

// Report "system.io." metrics
JsonNode ioStats = metrics.get("ioStats");
if (ioStats != null && ioStats.isObject()) {
ioStats.fields().forEachRemaining(entry -> {
Map<String, String> deviceTags = ImmutableMap.<String, String>builder().
putAll(systemTags).
put("device", entry.getKey()).
build();
if (entry.getValue() != null && entry.getValue().isObject()) {
entry.getValue().fields().forEachRemaining(metricEntry -> {
String metric = "system.io." + metricEntry.getKey().replace('%', ' ').
replace('/', '_').trim();
reportValue(metric, hostName, deviceTags, metricEntry.getValue(), timestamp,
pointCounter);
});
}
});
}

// Report all metrics that already start with "system."
metrics.fields().forEachRemaining(entry -> {
if (entry.getKey().startsWith("system.")) {
reportValue(entry.getKey(), hostName, systemTags, entry.getValue(), timestamp,
pointCounter);
}
});
}

// Report all metrics that already start with "system."
metrics.fields().forEachRemaining(entry -> {
if (entry.getKey().startsWith("system.")) {
reportValue(entry.getKey(), hostName, systemTags, entry.getValue(), timestamp,
pointCounter);
}
});

// Report CPU and memory metrics
reportValue("system.cpu.guest", hostName, systemTags, metrics.get("cpuGuest"), timestamp, pointCounter);
reportValue("system.cpu.idle", hostName, systemTags, metrics.get("cpuIdle"), timestamp, pointCounter);
reportValue("system.cpu.stolen", hostName, systemTags, metrics.get("cpuStolen"), timestamp, pointCounter);
reportValue("system.cpu.system", hostName, systemTags, metrics.get("cpuSystem"), timestamp, pointCounter);
reportValue("system.cpu.user", hostName, systemTags, metrics.get("cpuUser"), timestamp, pointCounter);
reportValue("system.cpu.wait", hostName, systemTags, metrics.get("cpuWait"), timestamp, pointCounter);
reportValue("system.mem.buffers", hostName, systemTags, metrics.get("memBuffers"), timestamp, pointCounter);
reportValue("system.mem.cached", hostName, systemTags, metrics.get("memCached"), timestamp, pointCounter);
reportValue("system.mem.page_tables", hostName, systemTags, metrics.get("memPageTables"), timestamp, pointCounter);
reportValue("system.mem.shared", hostName, systemTags, metrics.get("memShared"), timestamp, pointCounter);
reportValue("system.mem.slab", hostName, systemTags, metrics.get("memSlab"), timestamp, pointCounter);
reportValue("system.mem.free", hostName, systemTags, metrics.get("memPhysFree"), timestamp, pointCounter);
reportValue("system.mem.pct_usable", hostName, systemTags, metrics.get("memPhysPctUsable"), timestamp, pointCounter);
reportValue("system.mem.total", hostName, systemTags, metrics.get("memPhysTotal"), timestamp, pointCounter);
reportValue("system.mem.usable", hostName, systemTags, metrics.get("memPhysUsable"), timestamp, pointCounter);
reportValue("system.mem.used", hostName, systemTags, metrics.get("memPhysUsed"), timestamp, pointCounter);
reportValue("system.swap.cached", hostName, systemTags, metrics.get("memSwapCached"), timestamp, pointCounter);
reportValue("system.swap.free", hostName, systemTags, metrics.get("memSwapFree"), timestamp, pointCounter);
reportValue("system.swap.pct_free", hostName, systemTags, metrics.get("memSwapPctFree"), timestamp, pointCounter);
reportValue("system.swap.total", hostName, systemTags, metrics.get("memSwapTotal"), timestamp, pointCounter);
reportValue("system.swap.used", hostName, systemTags, metrics.get("memSwapUsed"), timestamp, pointCounter);
// Report CPU and memory metrics
reportValue("system.cpu.guest", hostName, systemTags, metrics.get("cpuGuest"), timestamp, pointCounter);
reportValue("system.cpu.idle", hostName, systemTags, metrics.get("cpuIdle"), timestamp, pointCounter);
reportValue("system.cpu.stolen", hostName, systemTags, metrics.get("cpuStolen"), timestamp, pointCounter);
reportValue("system.cpu.system", hostName, systemTags, metrics.get("cpuSystem"), timestamp, pointCounter);
reportValue("system.cpu.user", hostName, systemTags, metrics.get("cpuUser"), timestamp, pointCounter);
reportValue("system.cpu.wait", hostName, systemTags, metrics.get("cpuWait"), timestamp, pointCounter);
reportValue("system.mem.buffers", hostName, systemTags, metrics.get("memBuffers"), timestamp, pointCounter);
reportValue("system.mem.cached", hostName, systemTags, metrics.get("memCached"), timestamp, pointCounter);
reportValue("system.mem.page_tables", hostName, systemTags, metrics.get("memPageTables"), timestamp, pointCounter);
reportValue("system.mem.shared", hostName, systemTags, metrics.get("memShared"), timestamp, pointCounter);
reportValue("system.mem.slab", hostName, systemTags, metrics.get("memSlab"), timestamp, pointCounter);
reportValue("system.mem.free", hostName, systemTags, metrics.get("memPhysFree"), timestamp, pointCounter);
reportValue("system.mem.pct_usable", hostName, systemTags, metrics.get("memPhysPctUsable"), timestamp, pointCounter);
reportValue("system.mem.total", hostName, systemTags, metrics.get("memPhysTotal"), timestamp, pointCounter);
reportValue("system.mem.usable", hostName, systemTags, metrics.get("memPhysUsable"), timestamp, pointCounter);
reportValue("system.mem.used", hostName, systemTags, metrics.get("memPhysUsed"), timestamp, pointCounter);
reportValue("system.swap.cached", hostName, systemTags, metrics.get("memSwapCached"), timestamp, pointCounter);
reportValue("system.swap.free", hostName, systemTags, metrics.get("memSwapFree"), timestamp, pointCounter);
reportValue("system.swap.pct_free", hostName, systemTags, metrics.get("memSwapPctFree"), timestamp, pointCounter);
reportValue("system.swap.total", hostName, systemTags, metrics.get("memSwapTotal"), timestamp, pointCounter);
reportValue("system.swap.used", hostName, systemTags, metrics.get("memSwapUsed"), timestamp, pointCounter);
}
return true;
}

Expand Down
25 changes: 21 additions & 4 deletions proxy/src/test/java/com/wavefront/agent/PushAgentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,19 @@ public void testDataDogUnifiedPortHandler() throws Exception {
mockHttpClient);
waitUntilListenerIsOnline(ddPort2);

int ddPort3 = findAvailablePort(4990);
PushAgent proxy3 = new PushAgent();
proxy3.proxyConfig.dataBackfillCutoffHours = 100000000;
proxy3.proxyConfig.dataDogJsonPorts = String.valueOf(ddPort3);
proxy3.proxyConfig.dataDogProcessSystemMetrics = true;
proxy3.proxyConfig.dataDogProcessServiceChecks = true;
assertTrue(proxy3.proxyConfig.isDataDogProcessSystemMetrics());
assertTrue(proxy3.proxyConfig.isDataDogProcessServiceChecks());

proxy3.startDataDogListener(proxy3.proxyConfig.getDataDogJsonPorts(), mockHandlerFactory,
mockHttpClient);
waitUntilListenerIsOnline(ddPort3);

// test 1: post to /intake with system metrics enabled and http relay enabled
HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
StatusLine mockStatusLine = EasyMock.createMock(StatusLine.class);
Expand Down Expand Up @@ -553,14 +566,15 @@ public void testDataDogUnifiedPortHandler() throws Exception {
gzippedHttpPost("http://localhost:" + ddPort + "/api/v1/check_run", getResource("ddTestServiceCheck.json"));
verify(mockPointHandler);

// test 6: post to /api/v1/series
// test 6: post to /api/v1/series including a /api/v1/intake call to ensure system host-tags are propogated
reset(mockPointHandler);
mockPointHandler.report(ReportPoint.newBuilder().
setTable("dummy").
setMetric("system.net.tcp.retrans_segs").
setHost("testhost").
setTimestamp(1531176936000L).
setValue(0.0d).
setAnnotations(ImmutableMap.of("app", "closedstack", "role", "control")).
build());
expectLastCall().once();
mockPointHandler.report(ReportPoint.newBuilder().
Expand All @@ -569,7 +583,8 @@ public void testDataDogUnifiedPortHandler() throws Exception {
setHost("testhost").
setTimestamp(1531176936000L).
setValue(0.0d).
setAnnotations(ImmutableMap.of("_source", "Launcher", "env", "prod", "type", "test")).
setAnnotations(ImmutableMap.of("_source", "Launcher", "env", "prod",
"app", "openstack", "role", "control")).
build());
expectLastCall().once();
mockPointHandler.report(ReportPoint.newBuilder().
Expand All @@ -578,7 +593,7 @@ public void testDataDogUnifiedPortHandler() throws Exception {
setHost("testhost").
setTimestamp(1531176936000L).
setValue(12.052631578947368d).
setAnnotations(ImmutableMap.of("device", "eth0")).
setAnnotations(ImmutableMap.of("device", "eth0", "app", "closedstack", "role", "control")).
build());
expectLastCall().once();
mockPointHandler.report(ReportPoint.newBuilder().
Expand All @@ -587,10 +602,12 @@ public void testDataDogUnifiedPortHandler() throws Exception {
setHost("testhost").
setTimestamp(1531176936000L).
setValue(400.0d).
setAnnotations(ImmutableMap.of("app", "closedstack", "role", "control")).
build());
expectLastCall().once();
replay(mockPointHandler);
gzippedHttpPost("http://localhost:" + ddPort + "/api/v1/series", getResource("ddTestTimeseries.json"));
gzippedHttpPost("http://localhost:" + ddPort3 + "/intake", getResource("ddTestSystemMetadataOnly.json"));
gzippedHttpPost("http://localhost:" + ddPort3 + "/api/v1/series", getResource("ddTestTimeseries.json"));
verify(mockPointHandler);

// test 7: post multiple checks to /api/v1/check_run with service checks enabled
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"internalHostname": "testHost",
"host-tags": {
"system": [
"app:closedstack",
"role:control"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
]
],
"host": "testHost",
"tags": ["env:prod,type:test", "source:Launcher"]
"tags": ["env:prod,app:openstack", "source:Launcher"]
},
{
"type": "gauge",
Expand Down

0 comments on commit 89e40f4

Please sign in to comment.