Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKARA-971: Stop clumping up logs in logstash #1120

Closed
wants to merge 8 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -33,6 +33,7 @@
requires org.openjdk.skara.network;
requires org.openjdk.skara.version;

requires java.net.http;
requires java.sql;

exports org.openjdk.skara.bots.cli;
@@ -23,22 +23,27 @@
package org.openjdk.skara.bots.cli;

import org.openjdk.skara.bot.LogContextMap;
import org.openjdk.skara.network.RestRequest;
import org.openjdk.skara.json.JSON;

import java.io.*;
import java.net.URI;
import java.net.http.*;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Future;
import java.util.logging.*;
import java.util.regex.Pattern;

/**
* Handles logging to logstash. Be careful not to call anything that creates new
* log records from this class as that can cause infinite recursion.
*/
public class BotLogstashHandler extends StreamHandler {
private final RestRequest endpoint;
private final URI endpoint;
private final HttpClient httpClient;
private final DateTimeFormatter dateTimeFormatter;
private final Logger log = Logger.getLogger("org.openjdk.skara.bots.cli");

// Optionally store all futures for testing purposes
private Collection<Future<HttpResponse<Void>>> futures;

private static class ExtraField {
String name;
@@ -49,7 +54,11 @@ private static class ExtraField {
private final List<ExtraField> extraFields;

BotLogstashHandler(URI endpoint) {
this.endpoint = new RestRequest(endpoint);
this.endpoint = endpoint;
this.httpClient = HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(30))
.build();
dateTimeFormatter = DateTimeFormatter.ISO_INSTANT
.withLocale(Locale.getDefault())
.withZone(ZoneId.systemDefault());
@@ -71,27 +80,28 @@ void addExtraField(String name, String value, String pattern) {
}

private void publishToLogstash(Instant time, Level level, String message, Map<String, String> extraFields) {
try {
var query = JSON.object();
query.put("@timestamp", dateTimeFormatter.format(time));
query.put("level", level.getName());
query.put("level_value", level.intValue());
query.put("message", message);
var query = JSON.object();
query.put("@timestamp", dateTimeFormatter.format(time));
query.put("level", level.getName());
query.put("level_value", level.intValue());
query.put("message", message);

for (var entry : LogContextMap.entrySet()) {
query.put(entry.getKey(), entry.getValue());
}
for (var entry : LogContextMap.entrySet()) {
query.put(entry.getKey(), entry.getValue());
}

for (var extraField : extraFields.entrySet()) {
query.put(extraField.getKey(), extraField.getValue());
}
for (var extraField : extraFields.entrySet()) {
query.put(extraField.getKey(), extraField.getValue());
}

endpoint.post("/")
.body(query)
.executeUnparsed();
} catch (RuntimeException | IOException e) {
log.warning("Exception during logstash publishing: " + e.getMessage());
log.throwing("BotSlackHandler", "publish", e);
var httpRequest = HttpRequest.newBuilder()
.uri(endpoint)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(query.toString()))
.build();
var future = httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.discarding());
if (futures != null) {
futures.add(future);
}
}

@@ -118,4 +128,8 @@ public void publish(LogRecord record) {
}
publishToLogstash(record.getInstant(), record.getLevel(), record.getMessage(), getExtraFields(record));
}

void setFuturesCollection(Collection<Future<HttpResponse<Void>>> futures) {
this.futures = futures;
}
}
@@ -22,22 +22,35 @@
*/
package org.openjdk.skara.bots.cli;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.logging.*;
import java.util.stream.Collectors;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.LogRecord;

import static org.junit.jupiter.api.Assertions.*;

class BotLogstashHandlerTests {

@Test
void simple() throws IOException {
void simple() throws IOException, ExecutionException, InterruptedException {
try (var receiver = new RestReceiver()) {
var handler = new BotLogstashHandler(receiver.getEndpoint());
var futures = new ArrayList<Future<HttpResponse<Void>>>();
handler.setFuturesCollection(futures);

var record = new LogRecord(Level.INFO, "Hello");
handler.publish(record);

for (Future<HttpResponse<Void>> future : futures) {
future.get();
}

var requests = receiver.getRequests();
assertEquals(1, requests.size(), requests.toString());
assertTrue(requests.get(0).get("message").asString().contains("Hello"));
@@ -46,17 +59,27 @@ void simple() throws IOException {
}

@Test
void simpleTask() throws IOException {
void simpleTask() throws IOException, ExecutionException, InterruptedException {
try (var receiver = new RestReceiver()) {
var handler = new BotLogstashHandler(receiver.getEndpoint());
var futures = new ArrayList<Future<HttpResponse<Void>>>();
handler.setFuturesCollection(futures);

LoggingBot.runOnce(handler, log -> {
log.warning("Hello");
log.warning("Warning!");
log.warning("Bye");
});

for (Future<HttpResponse<Void>> future : futures) {
future.get();
}

var requests = receiver.getRequests();
// The async message sending means we may get results in any order. Sort on the
// timestamp to get the actual order.
requests.sort(Comparator.comparing(r -> r.get("@timestamp").toString()));

assertEquals(3, requests.size(), requests.toString());
assertEquals(Level.WARNING.getName(), requests.get(0).get("level").asString());
assertEquals(Level.WARNING.intValue(), requests.get(0).get("level_value").asInt());
@@ -71,15 +94,22 @@ void simpleTask() throws IOException {
}

@Test
void extraField() throws IOException {
void extraField() throws IOException, ExecutionException, InterruptedException {
try (var receiver = new RestReceiver()) {
var handler = new BotLogstashHandler(receiver.getEndpoint());
var futures = new ArrayList<Future<HttpResponse<Void>>>();
handler.setFuturesCollection(futures);

handler.addExtraField("mandatory", "value");
handler.addExtraField("optional1", "$1", "^H(ello)$");
handler.addExtraField("optional2", "$1", "^(Not found)$");
var record = new LogRecord(Level.INFO, "Hello");
handler.publish(record);

for (Future<HttpResponse<Void>> future : futures) {
future.get();
}

var requests = receiver.getRequests();
assertEquals(1, requests.size(), requests.toString());
assertEquals("value", requests.get(0).get("mandatory").asString());
@@ -89,9 +119,12 @@ void extraField() throws IOException {
}

@Test
void extraFieldTask() throws IOException {
void extraFieldTask() throws IOException, ExecutionException, InterruptedException {
try (var receiver = new RestReceiver()) {
var handler = new BotLogstashHandler(receiver.getEndpoint());
var futures = new ArrayList<Future<HttpResponse<Void>>>();
handler.setFuturesCollection(futures);

handler.addExtraField("mandatory", "value");
handler.addExtraField("optional1", "$1", "^H(ello)$");
handler.addExtraField("optional2", "$1", "^(Not found)$");
@@ -104,7 +137,15 @@ void extraFieldTask() throws IOException {
log.warning("Bye");
});

for (Future<HttpResponse<Void>> future : futures) {
future.get();
}

var requests = receiver.getRequests();
// The async message sending means we may get results in any order. Sort on the
// timestamp to get the actual order.
requests.sort(Comparator.comparing(r -> r.get("@timestamp").toString()));

assertEquals(3, requests.size(), requests.toString());
assertEquals("value", requests.get(0).get("mandatory").asString());
assertEquals("ello", requests.get(0).get("optional1").asString());