Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Log consumers are now called with exactly one complete log line #5854

Merged
merged 15 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,16 +2,13 @@

import com.github.dockerjava.api.async.ResultCallbackTemplate;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
Expand All @@ -24,27 +21,9 @@ public class FrameConsumerResultCallback extends ResultCallbackTemplate<FrameCon

private static final Logger LOGGER = LoggerFactory.getLogger(FrameConsumerResultCallback.class);

private static final byte[] EMPTY_LINE = new byte[0];
private final Map<OutputFrame.OutputType, LineConsumer> consumers = new HashMap<>();

private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m");

private static final String LINE_BREAK_REGEX = "((\\r?\\n)|(\\r))";

private static final Pattern LINE_BREAK_PATTERN = Pattern.compile(LINE_BREAK_REGEX);

static final Pattern LINE_BREAK_AT_END_PATTERN = Pattern.compile(LINE_BREAK_REGEX + "$");

private Map<OutputFrame.OutputType, Consumer<OutputFrame>> consumers;

private CountDownLatch completionLatch = new CountDownLatch(1);

private StringBuilder logString = new StringBuilder();

private OutputFrame brokenFrame;

public FrameConsumerResultCallback() {
consumers = new HashMap<>();
}
private final CountDownLatch completionLatch = new CountDownLatch(1);

/**
* Set this callback to use the specified consumer for the given output type.
Expand All @@ -53,23 +32,19 @@ public FrameConsumerResultCallback() {
* @param consumer the consumer to use for that output type
*/
public void addConsumer(OutputFrame.OutputType outputType, Consumer<OutputFrame> consumer) {
consumers.put(outputType, consumer);
consumers.put(outputType, new LineConsumer(outputType, consumer));
}

@Override
public void onNext(Frame frame) {
if (frame != null) {
OutputFrame outputFrame = OutputFrame.forFrame(frame);
if (outputFrame != null) {
Consumer<OutputFrame> consumer = consumers.get(outputFrame.getType());
final OutputFrame.OutputType type = OutputFrame.OutputType.forStreamType(frame.getStreamType());
if (type != null) {
final LineConsumer consumer = consumers.get(type);
if (consumer == null) {
LOGGER.error("got frame with type {}, for which no handler is configured", frame.getStreamType());
} else if (outputFrame.getBytes() != null && outputFrame.getBytes().length > 0) {
if (frame.getStreamType() == StreamType.RAW) {
processRawFrame(outputFrame, consumer);
} else {
processOtherFrame(outputFrame, consumer);
}
} else if (frame.getPayload() != null) {
consumer.processFrame(frame.getPayload());
}
}
}
Expand All @@ -85,19 +60,12 @@ public void onError(Throwable throwable) {

@Override
public void close() throws IOException {
OutputFrame lastLine = null;

if (logString.length() > 0) {
lastLine = new OutputFrame(OutputFrame.OutputType.STDOUT, logString.toString().getBytes());
if (completionLatch.getCount() == 0) {
return;
}

// send an END frame to every consumer... but only once per consumer.
for (Consumer<OutputFrame> consumer : new HashSet<>(consumers.values())) {
if (lastLine != null) {
consumer.accept(lastLine);
}
consumer.accept(OutputFrame.END);
}
consumers.values().forEach(LineConsumer::processBuffer);
consumers.values().forEach(LineConsumer::end);
super.close();

completionLatch.countDown();
Expand All @@ -110,70 +78,75 @@ public CountDownLatch getCompletionLatch() {
return completionLatch;
}

private synchronized void processRawFrame(OutputFrame outputFrame, Consumer<OutputFrame> consumer) {
String utf8String = outputFrame.getUtf8String();
byte[] bytes = outputFrame.getBytes();
private static class LineConsumer {

// Merging the strings by bytes to solve the problem breaking non-latin unicode symbols.
if (brokenFrame != null) {
bytes = merge(brokenFrame.getBytes(), bytes);
utf8String = new String(bytes);
brokenFrame = null;
}
// Logger chunks can break the string in middle of multibyte unicode character.
// Backup the bytes to reconstruct proper char sequence with bytes from next frame.
int lastCharacterType = Character.getType(utf8String.charAt(utf8String.length() - 1));
if (lastCharacterType == Character.OTHER_SYMBOL) {
brokenFrame = new OutputFrame(outputFrame.getType(), bytes);
return;
}
private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m");

utf8String = processAnsiColorCodes(utf8String, consumer);
normalizeLogLines(utf8String, consumer);
}
private final OutputFrame.OutputType type;

private synchronized void processOtherFrame(OutputFrame outputFrame, Consumer<OutputFrame> consumer) {
String utf8String = outputFrame.getUtf8String();
private final Consumer<OutputFrame> consumer;

utf8String = processAnsiColorCodes(utf8String, consumer);
consumer.accept(new OutputFrame(outputFrame.getType(), utf8String.getBytes()));
}
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();

private void normalizeLogLines(String utf8String, Consumer<OutputFrame> consumer) {
// Reformat strings to normalize new lines.
List<String> lines = new ArrayList<>(Arrays.asList(LINE_BREAK_PATTERN.split(utf8String)));
if (lines.isEmpty()) {
consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, EMPTY_LINE));
return;
}
if (utf8String.startsWith("\n") || utf8String.startsWith("\r")) {
lines.add(0, "");
private boolean lastCR = false;

LineConsumer(final OutputFrame.OutputType type, final Consumer<OutputFrame> consumer) {
this.type = type;
this.consumer = consumer;
}
if (utf8String.endsWith("\n") || utf8String.endsWith("\r")) {
lines.add("");

void processFrame(final byte[] b) {
int start = 0;
int i = 0;
while (i < b.length) {
switch (b[i]) {
case '\n':
buffer.write(b, start, i + 1 - start);
start = i + 1;
consume();
lastCR = false;
break;
case '\r':
if (lastCR) {
consume();
}
buffer.write(b, start, i + 1 - start);
start = i + 1;
lastCR = true;
break;
default:
if (lastCR) {
consume();
}
lastCR = false;
}
i++;
}
buffer.write(b, start, b.length - start);
}
for (int i = 0; i < lines.size() - 1; i++) {
String line = lines.get(i);
if (i == 0 && logString.length() > 0) {
line = logString.toString() + line;
logString.setLength(0);

void processBuffer() {
if (buffer.size() > 0) {
consume();
}
consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, line.getBytes()));
}
logString.append(lines.get(lines.size() - 1));
}

private String processAnsiColorCodes(String utf8String, Consumer<OutputFrame> consumer) {
if (!(consumer instanceof BaseConsumer) || ((BaseConsumer) consumer).isRemoveColorCodes()) {
return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll("");
void end() {
consumer.accept(OutputFrame.END);
}
return utf8String;
}

private byte[] merge(byte[] str1, byte[] str2) {
byte[] mergedString = new byte[str1.length + str2.length];
System.arraycopy(str1, 0, mergedString, 0, str1.length);
System.arraycopy(str2, 0, mergedString, str1.length, str2.length);
return mergedString;
private void consume() {
final String string = new String(buffer.toByteArray(), StandardCharsets.UTF_8);
final byte[] bytes = processAnsiColorCodes(string).getBytes(StandardCharsets.UTF_8);
consumer.accept(new OutputFrame(type, bytes));
buffer.reset();
}

private String processAnsiColorCodes(final String utf8String) {
if (!(consumer instanceof BaseConsumer) || ((BaseConsumer<?>) consumer).isRemoveColorCodes()) {
return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll("");
}
return utf8String;
}
}
}
Expand Up @@ -2,10 +2,11 @@

import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import com.google.common.base.Charsets;

import java.nio.charset.StandardCharsets;

/**
* Holds a frame of container output (usually one line, possibly more)
* Holds exactly one complete line of container output. Lines are split on newline characters (LF, CR LF).
*/
public class OutputFrame {

Expand All @@ -15,7 +16,7 @@ public class OutputFrame {

private final byte[] bytes;

public OutputFrame(OutputType type, byte[] bytes) {
public OutputFrame(final OutputType type, final byte[] bytes) {
this.type = type;
this.bytes = bytes;
}
Expand All @@ -29,11 +30,26 @@ public byte[] getBytes() {
}

public String getUtf8String() {
return (bytes == null) ? "" : new String(bytes, StandardCharsets.UTF_8);
}

public String getUtf8StringWithoutLineEnding() {
if (bytes == null) {
return "";
}
return new String(bytes, 0, bytes.length - determineLineEndingLength(bytes), StandardCharsets.UTF_8);
}

return new String(bytes, Charsets.UTF_8);
private static int determineLineEndingLength(final byte[] bytes) {
if (bytes.length > 0) {
switch (bytes[bytes.length - 1]) {
case '\r':
return 1;
case '\n':
return ((bytes.length > 1) && (bytes[bytes.length - 2] == '\r')) ? 2 : 1;
}
}
return 0;
}

public enum OutputType {
Expand All @@ -44,7 +60,6 @@ public enum OutputType {
public static OutputType forStreamType(StreamType streamType) {
switch (streamType) {
case RAW:
return STDOUT;
case STDOUT:
return STDOUT;
case STDERR:
Expand All @@ -56,7 +71,7 @@ public static OutputType forStreamType(StreamType streamType) {
}

public static OutputFrame forFrame(Frame frame) {
OutputType outputType = OutputType.forStreamType(frame.getStreamType());
final OutputType outputType = OutputType.forStreamType(frame.getStreamType());
if (outputType == null) {
return null;
}
Expand Down
Expand Up @@ -50,12 +50,10 @@ public Slf4jLogConsumer withSeparateOutputStreams() {

@Override
public void accept(OutputFrame outputFrame) {
OutputFrame.OutputType outputType = outputFrame.getType();
final OutputFrame.OutputType outputType = outputFrame.getType();
final String utf8String = outputFrame.getUtf8StringWithoutLineEnding();

String utf8String = outputFrame.getUtf8String();
utf8String = FrameConsumerResultCallback.LINE_BREAK_AT_END_PATTERN.matcher(utf8String).replaceAll("");

Map<String, String> originalMdc = MDC.getCopyOfContextMap();
final Map<String, String> originalMdc = MDC.getCopyOfContextMap();
MDC.setContextMap(mdc);
try {
switch (outputType) {
Expand Down
Expand Up @@ -11,14 +11,14 @@
*/
public class ToStringConsumer extends BaseConsumer<ToStringConsumer> {

private ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();
private final ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();

@Override
public void accept(OutputFrame outputFrame) {
try {
if (outputFrame.getBytes() != null) {
stringBuffer.write(outputFrame.getBytes());
stringBuffer.flush();
final byte[] bytes = outputFrame.getBytes();
if (bytes != null) {
stringBuffer.write(bytes);
}
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Expand Up @@ -82,11 +82,10 @@ private void waitUntil(Predicate<OutputFrame> predicate, long expiry, int times)
int numberOfMatches = 0;
while (System.currentTimeMillis() < expiry) {
try {
OutputFrame frame = frames.pollLast(100, TimeUnit.MILLISECONDS);
final OutputFrame frame = frames.pollLast(100, TimeUnit.MILLISECONDS);

if (frame != null) {
final String trimmedFrameText = frame.getUtf8String().replaceFirst("\n$", "");
LOGGER.debug("{}: {}", frame.getType(), trimmedFrameText);
LOGGER.debug("{}: {}", frame.getType(), frame.getUtf8StringWithoutLineEnding());

if (predicate.test(frame)) {
numberOfMatches++;
Expand Down