Skip to content

Commit

Permalink
Log consumers are now called with exactly one complete log line (#5854)
Browse files Browse the repository at this point in the history
Docker logs provide a stream, the frames provided are not necessarily split by newline characters. So, it might happen that a frame contains partial log lines.

Changes:
- Line splitting, partial log buffering and line merging independently of the stream type (RAW, STDOUT, STDERR)
- OutputFrame does consistently not contain newline characters (independent of TTY)
- ToStringConsumer now adds newlines
- Slf4jLogConsumer does not need to remove any newlines

Fixes #5843
  • Loading branch information
SgtSilvio committed Feb 16, 2023
1 parent 2c4caad commit 3ca5e67
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 136 deletions.
Expand Up @@ -2,16 +2,13 @@

import com.github.dockerjava.api.async.ResultCallbackTemplate;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
Expand All @@ -24,27 +21,9 @@ public class FrameConsumerResultCallback extends ResultCallbackTemplate<FrameCon

private static final Logger LOGGER = LoggerFactory.getLogger(FrameConsumerResultCallback.class);

private static final byte[] EMPTY_LINE = new byte[0];
private final Map<OutputFrame.OutputType, LineConsumer> consumers = new HashMap<>();

private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m");

private static final String LINE_BREAK_REGEX = "((\\r?\\n)|(\\r))";

private static final Pattern LINE_BREAK_PATTERN = Pattern.compile(LINE_BREAK_REGEX);

static final Pattern LINE_BREAK_AT_END_PATTERN = Pattern.compile(LINE_BREAK_REGEX + "$");

private Map<OutputFrame.OutputType, Consumer<OutputFrame>> consumers;

private CountDownLatch completionLatch = new CountDownLatch(1);

private StringBuilder logString = new StringBuilder();

private OutputFrame brokenFrame;

public FrameConsumerResultCallback() {
consumers = new HashMap<>();
}
private final CountDownLatch completionLatch = new CountDownLatch(1);

/**
* Set this callback to use the specified consumer for the given output type.
Expand All @@ -53,23 +32,19 @@ public FrameConsumerResultCallback() {
* @param consumer the consumer to use for that output type
*/
public void addConsumer(OutputFrame.OutputType outputType, Consumer<OutputFrame> consumer) {
consumers.put(outputType, consumer);
consumers.put(outputType, new LineConsumer(outputType, consumer));
}

@Override
public void onNext(Frame frame) {
if (frame != null) {
OutputFrame outputFrame = OutputFrame.forFrame(frame);
if (outputFrame != null) {
Consumer<OutputFrame> consumer = consumers.get(outputFrame.getType());
final OutputFrame.OutputType type = OutputFrame.OutputType.forStreamType(frame.getStreamType());
if (type != null) {
final LineConsumer consumer = consumers.get(type);
if (consumer == null) {
LOGGER.error("got frame with type {}, for which no handler is configured", frame.getStreamType());
} else if (outputFrame.getBytes() != null && outputFrame.getBytes().length > 0) {
if (frame.getStreamType() == StreamType.RAW) {
processRawFrame(outputFrame, consumer);
} else {
processOtherFrame(outputFrame, consumer);
}
} else if (frame.getPayload() != null) {
consumer.processFrame(frame.getPayload());
}
}
}
Expand All @@ -85,19 +60,12 @@ public void onError(Throwable throwable) {

@Override
public void close() throws IOException {
OutputFrame lastLine = null;

if (logString.length() > 0) {
lastLine = new OutputFrame(OutputFrame.OutputType.STDOUT, logString.toString().getBytes());
if (completionLatch.getCount() == 0) {
return;
}

// send an END frame to every consumer... but only once per consumer.
for (Consumer<OutputFrame> consumer : new HashSet<>(consumers.values())) {
if (lastLine != null) {
consumer.accept(lastLine);
}
consumer.accept(OutputFrame.END);
}
consumers.values().forEach(LineConsumer::processBuffer);
consumers.values().forEach(LineConsumer::end);
super.close();

completionLatch.countDown();
Expand All @@ -110,70 +78,75 @@ public CountDownLatch getCompletionLatch() {
return completionLatch;
}

private synchronized void processRawFrame(OutputFrame outputFrame, Consumer<OutputFrame> consumer) {
String utf8String = outputFrame.getUtf8String();
byte[] bytes = outputFrame.getBytes();
private static class LineConsumer {

// Merging the strings by bytes to solve the problem breaking non-latin unicode symbols.
if (brokenFrame != null) {
bytes = merge(brokenFrame.getBytes(), bytes);
utf8String = new String(bytes);
brokenFrame = null;
}
// Logger chunks can break the string in middle of multibyte unicode character.
// Backup the bytes to reconstruct proper char sequence with bytes from next frame.
int lastCharacterType = Character.getType(utf8String.charAt(utf8String.length() - 1));
if (lastCharacterType == Character.OTHER_SYMBOL) {
brokenFrame = new OutputFrame(outputFrame.getType(), bytes);
return;
}
private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m");

utf8String = processAnsiColorCodes(utf8String, consumer);
normalizeLogLines(utf8String, consumer);
}
private final OutputFrame.OutputType type;

private synchronized void processOtherFrame(OutputFrame outputFrame, Consumer<OutputFrame> consumer) {
String utf8String = outputFrame.getUtf8String();
private final Consumer<OutputFrame> consumer;

utf8String = processAnsiColorCodes(utf8String, consumer);
consumer.accept(new OutputFrame(outputFrame.getType(), utf8String.getBytes()));
}
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();

private void normalizeLogLines(String utf8String, Consumer<OutputFrame> consumer) {
// Reformat strings to normalize new lines.
List<String> lines = new ArrayList<>(Arrays.asList(LINE_BREAK_PATTERN.split(utf8String)));
if (lines.isEmpty()) {
consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, EMPTY_LINE));
return;
}
if (utf8String.startsWith("\n") || utf8String.startsWith("\r")) {
lines.add(0, "");
private boolean lastCR = false;

LineConsumer(final OutputFrame.OutputType type, final Consumer<OutputFrame> consumer) {
this.type = type;
this.consumer = consumer;
}
if (utf8String.endsWith("\n") || utf8String.endsWith("\r")) {
lines.add("");

void processFrame(final byte[] b) {
int start = 0;
int i = 0;
while (i < b.length) {
switch (b[i]) {
case '\n':
buffer.write(b, start, i + 1 - start);
start = i + 1;
consume();
lastCR = false;
break;
case '\r':
if (lastCR) {
consume();
}
buffer.write(b, start, i + 1 - start);
start = i + 1;
lastCR = true;
break;
default:
if (lastCR) {
consume();
}
lastCR = false;
}
i++;
}
buffer.write(b, start, b.length - start);
}
for (int i = 0; i < lines.size() - 1; i++) {
String line = lines.get(i);
if (i == 0 && logString.length() > 0) {
line = logString.toString() + line;
logString.setLength(0);

void processBuffer() {
if (buffer.size() > 0) {
consume();
}
consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, line.getBytes()));
}
logString.append(lines.get(lines.size() - 1));
}

private String processAnsiColorCodes(String utf8String, Consumer<OutputFrame> consumer) {
if (!(consumer instanceof BaseConsumer) || ((BaseConsumer) consumer).isRemoveColorCodes()) {
return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll("");
void end() {
consumer.accept(OutputFrame.END);
}
return utf8String;
}

private byte[] merge(byte[] str1, byte[] str2) {
byte[] mergedString = new byte[str1.length + str2.length];
System.arraycopy(str1, 0, mergedString, 0, str1.length);
System.arraycopy(str2, 0, mergedString, str1.length, str2.length);
return mergedString;
private void consume() {
final String string = new String(buffer.toByteArray(), StandardCharsets.UTF_8);
final byte[] bytes = processAnsiColorCodes(string).getBytes(StandardCharsets.UTF_8);
consumer.accept(new OutputFrame(type, bytes));
buffer.reset();
}

private String processAnsiColorCodes(final String utf8String) {
if (!(consumer instanceof BaseConsumer) || ((BaseConsumer<?>) consumer).isRemoveColorCodes()) {
return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll("");
}
return utf8String;
}
}
}
Expand Up @@ -2,10 +2,11 @@

import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import com.google.common.base.Charsets;

import java.nio.charset.StandardCharsets;

/**
* Holds a frame of container output (usually one line, possibly more)
* Holds exactly one complete line of container output. Lines are split on newline characters (LF, CR LF).
*/
public class OutputFrame {

Expand All @@ -15,7 +16,7 @@ public class OutputFrame {

private final byte[] bytes;

public OutputFrame(OutputType type, byte[] bytes) {
public OutputFrame(final OutputType type, final byte[] bytes) {
this.type = type;
this.bytes = bytes;
}
Expand All @@ -29,11 +30,26 @@ public byte[] getBytes() {
}

public String getUtf8String() {
return (bytes == null) ? "" : new String(bytes, StandardCharsets.UTF_8);
}

public String getUtf8StringWithoutLineEnding() {
if (bytes == null) {
return "";
}
return new String(bytes, 0, bytes.length - determineLineEndingLength(bytes), StandardCharsets.UTF_8);
}

return new String(bytes, Charsets.UTF_8);
private static int determineLineEndingLength(final byte[] bytes) {
if (bytes.length > 0) {
switch (bytes[bytes.length - 1]) {
case '\r':
return 1;
case '\n':
return ((bytes.length > 1) && (bytes[bytes.length - 2] == '\r')) ? 2 : 1;
}
}
return 0;
}

public enum OutputType {
Expand All @@ -44,7 +60,6 @@ public enum OutputType {
public static OutputType forStreamType(StreamType streamType) {
switch (streamType) {
case RAW:
return STDOUT;
case STDOUT:
return STDOUT;
case STDERR:
Expand All @@ -56,7 +71,7 @@ public static OutputType forStreamType(StreamType streamType) {
}

public static OutputFrame forFrame(Frame frame) {
OutputType outputType = OutputType.forStreamType(frame.getStreamType());
final OutputType outputType = OutputType.forStreamType(frame.getStreamType());
if (outputType == null) {
return null;
}
Expand Down
Expand Up @@ -50,12 +50,10 @@ public Slf4jLogConsumer withSeparateOutputStreams() {

@Override
public void accept(OutputFrame outputFrame) {
OutputFrame.OutputType outputType = outputFrame.getType();
final OutputFrame.OutputType outputType = outputFrame.getType();
final String utf8String = outputFrame.getUtf8StringWithoutLineEnding();

String utf8String = outputFrame.getUtf8String();
utf8String = FrameConsumerResultCallback.LINE_BREAK_AT_END_PATTERN.matcher(utf8String).replaceAll("");

Map<String, String> originalMdc = MDC.getCopyOfContextMap();
final Map<String, String> originalMdc = MDC.getCopyOfContextMap();
MDC.setContextMap(mdc);
try {
switch (outputType) {
Expand Down
Expand Up @@ -11,14 +11,14 @@
*/
public class ToStringConsumer extends BaseConsumer<ToStringConsumer> {

private ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();
private final ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();

@Override
public void accept(OutputFrame outputFrame) {
try {
if (outputFrame.getBytes() != null) {
stringBuffer.write(outputFrame.getBytes());
stringBuffer.flush();
final byte[] bytes = outputFrame.getBytes();
if (bytes != null) {
stringBuffer.write(bytes);
}
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Expand Up @@ -82,11 +82,10 @@ private void waitUntil(Predicate<OutputFrame> predicate, long expiry, int times)
int numberOfMatches = 0;
while (System.currentTimeMillis() < expiry) {
try {
OutputFrame frame = frames.pollLast(100, TimeUnit.MILLISECONDS);
final OutputFrame frame = frames.pollLast(100, TimeUnit.MILLISECONDS);

if (frame != null) {
final String trimmedFrameText = frame.getUtf8String().replaceFirst("\n$", "");
LOGGER.debug("{}: {}", frame.getType(), trimmedFrameText);
LOGGER.debug("{}: {}", frame.getType(), frame.getUtf8StringWithoutLineEnding());

if (predicate.test(frame)) {
numberOfMatches++;
Expand Down

0 comments on commit 3ca5e67

Please sign in to comment.