Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKARA-971: Stop clumping up logs in logstash #1120

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 42 additions & 32 deletions bot/src/main/java/org/openjdk/skara/bot/BotRunner.java
Expand Up @@ -22,6 +22,7 @@
*/
package org.openjdk.skara.bot;

import java.util.concurrent.atomic.AtomicInteger;
import org.openjdk.skara.json.JSONValue;

import java.io.IOException;
Expand Down Expand Up @@ -49,6 +50,8 @@ enum TaskPhases {
END
}

private AtomicInteger workIdCounter = new AtomicInteger();

private class RunnableWorkItem implements Runnable {
private final WorkItem item;

Expand All @@ -73,16 +76,19 @@ public void run() {
scratchPath = scratchPaths.removeFirst();
}

log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
Collection<WorkItem> followUpItems = null;
try {
followUpItems = item.run(scratchPath);
} catch (RuntimeException e) {
log.severe("Exception during item execution (" + item + "): " + e.getMessage());
item.handleRuntimeException(e);
log.throwing(item.toString(), "run", e);
} finally {
log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
try (var __ = new LogContext(Map.of("work_item", item.toString(),
"work_id", String.valueOf(workIdCounter.incrementAndGet())))) {
log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
try {
followUpItems = item.run(scratchPath);
} catch (RuntimeException e) {
log.severe("Exception during item execution (" + item + "): " + e.getMessage());
item.handleRuntimeException(e);
log.throwing(item.toString(), "run", e);
} finally {
log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
}
}
if (followUpItems != null) {
followUpItems.forEach(BotRunner.this::submitOrSchedule);
Expand Down Expand Up @@ -215,19 +221,21 @@ public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
}

private void checkPeriodicItems() {
log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
try {
for (var bot : bots) {
var items = bot.getPeriodicItems();
for (var item : items) {
submitOrSchedule(item);
try (var __ = new LogContext("work_id", String.valueOf(workIdCounter.incrementAndGet()))) {
log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
try {
for (var bot : bots) {
var items = bot.getPeriodicItems();
for (var item : items) {
submitOrSchedule(item);
}
}
} catch (RuntimeException e) {
log.severe("Exception during periodic item checking: " + e.getMessage());
log.throwing("BotRunner", "checkPeriodicItems", e);
} finally {
log.log(Level.FINE, "Done checking periodic items", TaskPhases.END);
}
} catch (RuntimeException e) {
log.severe("Exception during periodic item checking: " + e.getMessage());
log.throwing("BotRunner", "checkPeriodicItems", e);
} finally {
log.log(Level.FINE, "Done checking periodic items", TaskPhases.END);
}
}

Expand All @@ -248,20 +256,22 @@ private void itemWatchdog() {
}

private void processRestRequest(JSONValue request) {
log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
log.fine("Request: " + request);
try {
for (var bot : bots) {
var items = bot.processWebHook(request);
for (var item : items) {
submitOrSchedule(item);
try (var __ = new LogContext("work_id", String.valueOf(workIdCounter.incrementAndGet()))) {
log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
log.fine("Request: " + request);
try {
for (var bot : bots) {
var items = bot.processWebHook(request);
for (var item : items) {
submitOrSchedule(item);
}
}
} catch (RuntimeException e) {
log.severe("Exception during rest request processing: " + e.getMessage());
log.throwing("BotRunner", "processRestRequest", e);
} finally {
log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
}
} catch (RuntimeException e) {
log.severe("Exception during rest request processing: " + e.getMessage());
log.throwing("BotRunner", "processRestRequest", e);
} finally {
log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
}
}

Expand Down
50 changes: 50 additions & 0 deletions bot/src/main/java/org/openjdk/skara/bot/LogContext.java
@@ -0,0 +1,50 @@
package org.openjdk.skara.bot;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
* A LogContext is used to temporarily add extra log metadata in the current thread.
* It should be initiated with a try-with-resources construct. The variable itself
* is never used, we only want the controlled automatic close at the end of the try
* block. Typically name the variable __. Example:
*
* try (var __ = new LogContext("foo", "bar")) {
* // some code that logs stuff
* }
*/
public class LogContext implements AutoCloseable {
private static final Logger log = Logger.getLogger("org.openjdk.skara.bot");
private final Map<String, String> context = new HashMap<>();

public LogContext(String key, String value) {
this.init(Map.of(key, value));
}

public LogContext(Map<String, String> ctx) {
this.init(ctx);
}

private void init(Map<String, String> newContext) {
for (var entry : newContext.entrySet()) {
String currentValue = LogContextMap.get(entry.getKey());
if (currentValue != null) {
if (!currentValue.equals(entry.getValue())) {
log.severe("Tried to override the current LogContext value: " + currentValue
+ " for " + entry.getKey() + " with a different value: " + entry.getValue());
}
} else {
this.context.put(entry.getKey(), entry.getValue());
LogContextMap.put(entry.getKey(), entry.getValue());
}
}

}

public void close() {
this.context.forEach((key, value) -> {
LogContextMap.remove(key);
});
}
}
48 changes: 48 additions & 0 deletions bot/src/main/java/org/openjdk/skara/bot/LogContextMap.java
@@ -0,0 +1,48 @@
package org.openjdk.skara.bot;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* This class holds a static thread local hashmap to store temporary log
* metadata which our custom StreamHandlers can pick up and include in log
* messages.
*/
public class LogContextMap {

private static final ThreadLocal<HashMap<String, String>> threadContextMap = new ThreadLocal<>();

public static void put(String key, String value) {
if (threadContextMap.get() == null) {
threadContextMap.set(new HashMap<>());
}
var map = threadContextMap.get();
map.put(key, value);
}

public static String get(String key) {
if (threadContextMap.get() != null) {
return threadContextMap.get().get(key);
} else {
return null;
}
}

public static String remove(String key) {
if (threadContextMap.get() != null) {
return threadContextMap.get().remove(key);
} else {
return null;
}
}

public static Set<Map.Entry<String, String>> entrySet() {
if (threadContextMap.get() != null) {
return threadContextMap.get().entrySet();
} else {
return Collections.emptySet();
}
}
}
Expand Up @@ -22,6 +22,9 @@
*/
package org.openjdk.skara.bots.cli;

import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import org.openjdk.skara.args.*;
import org.openjdk.skara.bot.*;
import org.openjdk.skara.network.URIBuilder;
Expand All @@ -41,6 +44,7 @@

public class BotLauncher {
private static Logger log;
private static final LocalDate START_TIME = LocalDate.now();

private static void applyLogging(JSONObject config) {
LogManager.getLogManager().reset();
Expand Down Expand Up @@ -81,11 +85,7 @@ private static void applyLogging(JSONObject config) {
if (config.get("log").asObject().contains("logstash")) {
var logstashConf = config.get("log").get("logstash").asObject();
var level = Level.parse(logstashConf.get("level").asString());
var maxRecords = 100;
if (logstashConf.contains("maxrecords")) {
maxRecords = logstashConf.get("maxrecords").asInt();
}
var handler = new BotLogstashHandler(URIBuilder.base(logstashConf.get("endpoint").asString()).build(), maxRecords);
var handler = new BotLogstashHandler(URIBuilder.base(logstashConf.get("endpoint").asString()).build());
if (logstashConf.contains("fields")) {
for (var field : logstashConf.get("fields").asArray()) {
if (field.asObject().contains("pattern")) {
Expand All @@ -99,6 +99,10 @@ private static void applyLogging(JSONObject config) {
}
}
handler.setLevel(level);
var dateTimeFormatter = DateTimeFormatter.ISO_INSTANT
.withLocale(Locale.getDefault())
.withZone(ZoneId.systemDefault());
handler.addExtraField("instance_start_time", START_TIME.format(dateTimeFormatter));
log.addHandler(handler);
}
}
Expand Down
Expand Up @@ -22,7 +22,7 @@
*/
package org.openjdk.skara.bots.cli;

import org.openjdk.skara.bot.BotTaskAggregationHandler;
import org.openjdk.skara.bot.LogContextMap;
import org.openjdk.skara.network.RestRequest;
import org.openjdk.skara.json.JSON;

Expand All @@ -33,12 +33,10 @@
import java.util.*;
import java.util.logging.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class BotLogstashHandler extends BotTaskAggregationHandler {
public class BotLogstashHandler extends StreamHandler {
private final RestRequest endpoint;
erikj79 marked this conversation as resolved.
Show resolved Hide resolved
private final DateTimeFormatter dateTimeFormatter;
private final int maxRecords;
private final Logger log = Logger.getLogger("org.openjdk.skara.bots.cli");


Expand All @@ -50,9 +48,8 @@ private static class ExtraField {

private final List<ExtraField> extraFields;

BotLogstashHandler(URI endpoint, int maxRecords) {
BotLogstashHandler(URI endpoint) {
this.endpoint = new RestRequest(endpoint);
this.maxRecords = maxRecords;
dateTimeFormatter = DateTimeFormatter.ISO_INSTANT
.withLocale(Locale.getDefault())
.withZone(ZoneId.systemDefault());
Expand Down Expand Up @@ -81,6 +78,10 @@ private void publishToLogstash(Instant time, Level level, String message, Map<St
query.put("level_value", level.intValue());
query.put("message", message);

for (Map.Entry<String, String> entry : LogContextMap.entrySet()) {
erikj79 marked this conversation as resolved.
Show resolved Hide resolved
query.put(entry.getKey(), entry.getValue());
}

for (var extraField : extraFields.entrySet()) {
query.put(extraField.getKey(), extraField.getValue());
}
Expand All @@ -94,28 +95,6 @@ private void publishToLogstash(Instant time, Level level, String message, Map<St
}
}

private String formatDuration(Duration duration) {
return String.format("[%02d:%02d]", duration.toMinutes(), duration.toSeconds() % 60);
}

private String formatRecord(Instant base, LogRecord record) {
var writer = new StringWriter();
var printer = new PrintWriter(writer);

printer.print(formatDuration(Duration.between(base, record.getInstant())));
printer.print("[");
printer.print(record.getLevel().getName());
printer.print("] ");
printer.print(record.getMessage());

var exception = record.getThrown();
if (exception != null) {
exception.printStackTrace(printer);
}

return writer.toString().stripTrailing();
}

private Map<String, String> getExtraFields(LogRecord record) {
var ret = new HashMap<String, String>();
for (var extraField : extraFields) {
Expand All @@ -132,57 +111,8 @@ private Map<String, String> getExtraFields(LogRecord record) {
return ret;
}

// Remove every entry below minLevel
private List<LogRecord> filterRecords(List<LogRecord> records, Level minLevel) {
return records.stream()
.filter(entry -> entry.getLevel().intValue() >= minLevel.intValue())
.collect(Collectors.toList());
}

@Override
public void publishAggregated(List<LogRecord> task) {
var maxLevel = task.stream()
.max(Comparator.comparingInt(r -> r.getLevel().intValue()))
.map(LogRecord::getLevel)
.orElseThrow();
if (maxLevel.intValue() < getLevel().intValue()) {
return;
}

var start = task.get(0).getInstant();

// For duplicate keys, the first value seen is retained
var concatenatedFields = task.stream()
.map(this::getExtraFields)
.flatMap(extra -> extra.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
(value1, value2) -> value1));

// First try to accommodate size limit by filtering out low level logging
if (task.size() > maxRecords) {
task = filterRecords(task, Level.FINER);
}
if (task.size() > maxRecords) {
task = filterRecords(task, Level.FINE);
}

// If there's still too many lines, strip out the middle
if (task.size() > maxRecords) {
var beginning = task.subList(0, maxRecords / 2);
var end = task.subList(task.size() - maxRecords / 2, task.size());
task = beginning;
task.addAll(end);
}

var concatenatedMessage = task.stream()
.map(record -> formatRecord(start, record))
.collect(Collectors.joining("\n"));

publishToLogstash(start, maxLevel, concatenatedMessage, concatenatedFields);
}

@Override
public void publishSingle(LogRecord record) {
public void publish(LogRecord record) {
if (record.getLevel().intValue() < getLevel().intValue()) {
return;
}
Expand Down