Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a pool of reusable JsonGenerator #631

Merged
merged 7 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package net.logstash.logback.composite;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.util.Objects;
import java.util.ServiceConfigurationError;

Expand All @@ -34,6 +34,7 @@
import ch.qos.logback.core.spi.LifeCycle;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonFactory.Feature;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -71,7 +72,7 @@ public abstract class CompositeJsonFormatter<Event extends DeferredProcessingAwa
/**
* The providers that are used to populate the output JSON object.
*/
private JsonProviders<Event> jsonProviders = new JsonProviders<Event>();
private JsonProviders<Event> jsonProviders = new JsonProviders<>();

private JsonEncoding encoding = JsonEncoding.UTF8;

Expand Down Expand Up @@ -118,6 +119,41 @@ public boolean isStarted() {
return started;
}

/**
* Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}.
*
* @param outputStream the output stream used by the {@link JsonFormatter}
* @return {@link JsonFormatter} writing JSON content in the output stream
* @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content
*/
public JsonFormatter createJsonFormatter(OutputStream outputStream) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Formatter is not started");
}

JsonGenerator generator = createGenerator(outputStream);
return new JsonFormatter(generator);
}


public class JsonFormatter implements Closeable {
private final JsonGenerator generator;

public JsonFormatter(JsonGenerator generator) {
this.generator = generator;
}

public void writeEvent(Event event) throws IOException {
writeEventToGenerator(generator, event);
}

@Override
public void close() throws IOException {
this.generator.close();
}
}


private JsonFactory createJsonFactory() {
ObjectMapper objectMapper = new ObjectMapper()
/*
Expand All @@ -133,27 +169,23 @@ private JsonFactory createJsonFactory() {
}
}

return this.jsonFactoryDecorator.decorate(objectMapper.getFactory());
return decorateFactory(objectMapper.getFactory());
}

public void writeEventToOutputStream(Event event, OutputStream outputStream) throws IOException {
try (JsonGenerator generator = createGenerator(outputStream)) {
writeEventToGenerator(generator, event);
}
/*
* Do not flush the outputStream.
*
* Allow something higher in the stack (e.g. the encoder/appender)
* to determine appropriate times to flush.
*/
}

public void writeEventToWriter(Event event, Writer writer) throws IOException {
try (JsonGenerator generator = createGenerator(writer)) {
writeEventToGenerator(generator, event);
}
private JsonFactory decorateFactory(JsonFactory factory) {
return this.jsonFactoryDecorator.decorate(factory)
/*
* Jackson buffer recycling works by maintaining a pool of buffers per thread. This
* feature works best when one JsonGenerator is created per thread, typically in J2EE
* environments.
*
* Each JsonFormatter uses its own instance of JsonGenerator and is reused multiple times
* possibly on different threads. The memory buffers allocated by the JsonGenerator do
* not belong to a particular thread - hence the recycling feature should be disabled.
*/
.disable(Feature.USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING);
}

protected void writeEventToGenerator(JsonGenerator generator, Event event) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Encoding attempted before starting.");
Expand All @@ -172,10 +204,6 @@ protected void prepareForDeferredProcessing(Event event) {
private JsonGenerator createGenerator(OutputStream outputStream) throws IOException {
return decorateGenerator(jsonFactory.createGenerator(outputStream, encoding));
}

private JsonGenerator createGenerator(Writer writer) throws IOException {
return decorateGenerator(jsonFactory.createGenerator(writer));
}

private JsonGenerator decorateGenerator(JsonGenerator generator) {
return this.jsonGeneratorDecorator.decorate(generator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Objects;

import net.logstash.logback.composite.CompositeJsonFormatter;
import net.logstash.logback.composite.JsonProviders;
import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;
import net.logstash.logback.util.ReusableJsonFormatterPool;

import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
Expand All @@ -46,16 +46,12 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware
* unnecessary memory allocations and reduce pressure on the garbage collector.
*/
private int minBufferSize = 1024;

/**
* Provides reusable byte buffers (initialized when the encoder is started).
*/
private ReusableByteBufferPool bufferPool;

private Encoder<Event> prefix;
private Encoder<Event> suffix;

private final CompositeJsonFormatter<Event> formatter;
private ReusableJsonFormatterPool<Event> formatterPool;

private String lineSeparator = System.lineSeparator();

Expand All @@ -65,7 +61,7 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware

public CompositeJsonEncoder() {
super();
this.formatter = createFormatter();
this.formatter = Objects.requireNonNull(createFormatter());
}

protected abstract CompositeJsonFormatter<Event> createFormatter();
Expand All @@ -75,12 +71,11 @@ public void encode(Event event, OutputStream outputStream) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Encoder is not started");
}

encode(prefix, event, outputStream);
formatter.writeEventToOutputStream(event, outputStream);
encode(suffix, event, outputStream);

outputStream.write(lineSeparatorBytes);
try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
encode(cachedFormatter, event);
cachedFormatter.getBuffer().writeTo(outputStream);
}
}

@Override
Expand All @@ -89,18 +84,23 @@ public byte[] encode(Event event) {
throw new IllegalStateException("Encoder is not started");
}

ReusableByteBuffer buffer = bufferPool.acquire();
try {
encode(event, buffer);
return buffer.toByteArray();
try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
encode(cachedFormatter, event);
return cachedFormatter.getBuffer().toByteArray();

} catch (IOException e) {
addWarn("Error encountered while encoding log event. Event: " + event, e);
return EMPTY_BYTES;
} finally {
bufferPool.release(buffer);
}
}

private void encode(ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter, Event event) throws IOException {
encode(prefix, event, cachedFormatter.getBuffer());
cachedFormatter.write(event);
encode(suffix, event, cachedFormatter.getBuffer());
cachedFormatter.getBuffer().write(lineSeparatorBytes);
}

private void encode(Encoder<Event> encoder, Event event, OutputStream outputStream) throws IOException {
if (encoder != null) {
byte[] data = encoder.encode(event);
Expand All @@ -117,7 +117,7 @@ public void start() {
}

super.start();
this.bufferPool = new ReusableByteBufferPool(this.minBufferSize);

formatter.setContext(getContext());
formatter.start();
charset = Charset.forName(formatter.getEncoding());
Expand All @@ -126,6 +126,8 @@ public void start() {
: this.lineSeparator.getBytes(charset);
startWrapped(prefix);
startWrapped(suffix);

this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -168,6 +170,8 @@ public void stop() {
formatter.stop();
stopWrapped(prefix);
stopWrapped(suffix);

this.formatterPool = null;
}
}

Expand Down
58 changes: 33 additions & 25 deletions src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Objects;

import net.logstash.logback.composite.CompositeJsonFormatter;
import net.logstash.logback.composite.JsonProviders;
import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.encoder.CompositeJsonEncoder;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;
import net.logstash.logback.util.ReusableJsonFormatterPool;

import ch.qos.logback.core.Layout;
import ch.qos.logback.core.LayoutBase;
Expand Down Expand Up @@ -61,17 +61,13 @@ public abstract class CompositeJsonLayout<Event extends DeferredProcessingAware>
*/
private int minBufferSize = 1024;

/**
* Provides reusable byte buffers (initialized when layout is started)
*/
private ReusableByteBufferPool bufferPool;


private final CompositeJsonFormatter<Event> formatter;

private ReusableJsonFormatterPool<Event> formatterPool;

public CompositeJsonLayout() {
super();
this.formatter = createFormatter();
this.formatter = Objects.requireNonNull(createFormatter());
}

protected abstract CompositeJsonFormatter<Event> createFormatter();
Expand All @@ -82,26 +78,29 @@ public String doLayout(Event event) {
throw new IllegalStateException("Layout is not started");
}

ReusableByteBuffer buffer = this.bufferPool.acquire();
try (OutputStreamWriter writer = new OutputStreamWriter(buffer)) {
try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
writeEvent(cachedFormatter, event);
return new String(cachedFormatter.getBuffer().toByteArray());

} catch (IOException e) {
addWarn("Error formatting logging event", e);
return null;

}
}

private void writeEvent(ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter, Event event) throws IOException {
try (Writer writer = new OutputStreamWriter(cachedFormatter.getBuffer())) {
brenuart marked this conversation as resolved.
Show resolved Hide resolved
writeLayout(prefix, writer, event);
writeFormatter(writer, event);
cachedFormatter.write(event);
writeLayout(suffix, writer, event);

if (lineSeparator != null) {
writer.write(lineSeparator);
}
writer.flush();

return new String(buffer.toByteArray());
} catch (IOException e) {
addWarn("Error formatting logging event", e);
return null;
} finally {
bufferPool.release(buffer);
}
}


private void writeLayout(Layout<Event> wrapped, Writer writer, Event event) throws IOException {
if (wrapped == null) {
Expand All @@ -111,22 +110,25 @@ private void writeLayout(Layout<Event> wrapped, Writer writer, Event event) thro
String str = wrapped.doLayout(event);
if (str != null) {
writer.write(str);
writer.flush();
}
}

private void writeFormatter(Writer writer, Event event) throws IOException {
this.formatter.writeEventToWriter(event, writer);
}


@Override
public void start() {
if (isStarted()) {
return;
}

super.start();
this.bufferPool = new ReusableByteBufferPool(this.minBufferSize);
formatter.setContext(getContext());
formatter.start();

startWrapped(prefix);
startWrapped(suffix);

this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize);
}

private void startWrapped(Layout<Event> wrapped) {
Expand All @@ -151,10 +153,16 @@ private void startWrapped(Layout<Event> wrapped) {

@Override
public void stop() {
if (!isStarted()) {
return;
}

super.stop();
formatter.stop();
stopWrapped(prefix);
stopWrapped(suffix);

this.formatterPool = null;
}

private void stopWrapped(Layout<Event> wrapped) {
Expand Down
Loading