Skip to content

Commit

Permalink
Make DelegatingAsyncDisruptorAppender more resilient to exceptions + …
Browse files Browse the repository at this point in the history
…flush Flushable appenders at end of batch (#457)

* Make DelegatingAsyncDisruptorAppender more resilient to exceptions thrown by child appenders

- Wrap calls to appenders within a try/catch block and make sure exceptions thrown by one does not prevent other appenders from receiving the event (gh456).
- Flush appenders at end of batch only if they are started (gh456).
- OutputStreamAppender may occasionally return a null OutputStream (gh455)
- Flush appenders implementing java.io.Flushable (gh454)
  • Loading branch information
brenuart committed Dec 30, 2020
1 parent 5026ff0 commit ede18db
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@
*/
package net.logstash.logback.appender;

import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;

import ch.qos.logback.core.OutputStreamAppender;
import net.logstash.logback.appender.listener.AppenderListener;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;

import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.AsyncAppenderBase;
import ch.qos.logback.core.OutputStreamAppender;
import ch.qos.logback.core.spi.AppenderAttachable;
import ch.qos.logback.core.spi.AppenderAttachableImpl;
import ch.qos.logback.core.spi.DeferredProcessingAware;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import net.logstash.logback.appender.listener.AppenderListener;

/**
* An {@link AsyncDisruptorAppender} that delegates appending of an event
Expand All @@ -37,6 +40,8 @@
* <ul>
* <li>it uses a {@link RingBuffer} instead of a {@link BlockingQueue}</li>
* <li>it allows any number of delegate appenders, instead of just one</li>
* <li>it flushes appenders of type {@link OutputStreamAppender} or {@link Flushable} at the end of a batch</li>
* <li>it is resilient to exceptions and guarantees that all appenders are invoked</li>
* </ul>
*
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
Expand All @@ -49,28 +54,68 @@ public abstract class DelegatingAsyncDisruptorAppender<Event extends DeferredPro
private final AppenderAttachableImpl<Event> appenders = new AppenderAttachableImpl<Event>();

private class DelegatingEventHandler implements EventHandler<LogEvent<Event>> {
/**
* Whether exceptions should be reported with a error status or not.
*/
private boolean silentError;

@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {
appenders.appendLoopOnAppenders(logEvent.event);

/*
* Optimization:
*
* If any of the delegate appenders are instances of OutputStreamAppender,
* then flush the OutputStreams at the end of the batch.
*/
if (endOfBatch) {
for (Iterator<Appender<Event>> iter = appenders.iteratorForAppenders(); iter.hasNext(); ) {
Appender<Event> appender = iter.next();
if (appender instanceof OutputStreamAppender) {
OutputStreamAppender outputStreamAppender = (OutputStreamAppender) appender;
if (!outputStreamAppender.isImmediateFlush()) {
outputStreamAppender.getOutputStream().flush();
}

boolean exceptionThrown = false;
for(Iterator<Appender<Event>> it=appenders.iteratorForAppenders(); it.hasNext(); ) {
Appender<Event> appender = it.next();

try {
appender.doAppend(logEvent.event);

/*
* Optimization:
*
* If any of the delegate appenders are instances of OutputStreamAppender or Flushable,
* then flush them at the end of the batch.
*/
if (endOfBatch) {
flushAppender(appender);
}
}
catch(Exception e) {
exceptionThrown = true;
if (!this.silentError) {
addError(String.format("Unable to forward event to appender [%s]: %s", appender.getName(), e.getMessage()), e);
}
}
}

this.silentError = exceptionThrown;
}


private void flushAppender(Appender<Event> appender) throws IOException {
// Similar to #doAppend() - don't flush if appender is stopped
if (!appender.isStarted()) {
return;
}
if (appender instanceof Flushable) {
flushAppender((Flushable)appender);
}
else
if (appender instanceof OutputStreamAppender) {
flushAppender((OutputStreamAppender<Event>)appender);
}
}

private void flushAppender(OutputStreamAppender<Event> appender) throws IOException {
if (!appender.isImmediateFlush()) {
OutputStream os = appender.getOutputStream();
if (os != null) {
os.flush();
}
}
}

private void flushAppender(Flushable appender) throws IOException {
appender.flush();
}
}

Expand Down
Loading

0 comments on commit ede18db

Please sign in to comment.