Skip to content

Commit

Permalink
adding AsyncAppender
Browse files Browse the repository at this point in the history
  • Loading branch information
ceki committed May 10, 2012
1 parent 143811b commit c8c59cf
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 0 deletions.
206 changes: 206 additions & 0 deletions logback-core/src/main/java/ch/qos/logback/core/AsyncAppenderBase.java
@@ -0,0 +1,206 @@
/**
* Logback: the reliable, generic, fast and flexible logging framework.
* Copyright (C) 1999-2012, QOS.ch. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
* the Eclipse Foundation
*
* or (per the licensee's choosing)
*
* under the terms of the GNU Lesser General Public License version 2.1
* as published by the Free Software Foundation.
*/
package ch.qos.logback.core;

import ch.qos.logback.core.spi.AppenderAttachable;
import ch.qos.logback.core.spi.AppenderAttachableImpl;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* AsyncAppender lets users log events asynchronously. It uses a
* bounded buffer to store logging events. By default, if buffer is 80% full it will drop
* events meeting criteria as determined by the {@link #isDiscardable(Object)} method.
*
* <p>The AsyncAppender will collect the events sent to it and then
* dispatch them to all the appenders that are attached to it. You can
* attach multiple appenders to an AsyncAppender.
*
* <p>The AsyncAppender uses a separate thread to serve the events in
* its bounded buffer.
*
* @author Ceki G&uuml;lc&uuml;
* @param <E>
*/
public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {

AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
BlockingQueue<E> blockingQueue;

/** The default buffer size. */
public static final int DEFAULT_BUFFER_SIZE = 256;
int bufferSize = DEFAULT_BUFFER_SIZE;


static final int UNDEFINED = -1;
int discardingThreshold = UNDEFINED;

Worker worker = new Worker();

/**
* Is the eventObject passed as parameter discardable? The base class's implementation of this method always returns
* 'false' but sub-classes may (and do) override this method.
*
* <p>Note that only if the buffer is nearly full are events discarded. Otherwise, when the buffer is "not full"
* all events are logged.
*
* @param eventObject
* @return - true if the event can be discarded, false otherwise
*/
protected boolean isDiscardable(E eventObject) {
return false;
}


/**
* Pre-process the event prior to queueing. The base class does no pre-processing but sub-classes can
* override this behavior.
*
* @param eventObject
*/
protected void preprocess(E eventObject) {}


@Override
public void start() {
if(numberOfAttachedAppenders() == 0) {
addError("No attached appenders found.");
return;
}
blockingQueue = new ArrayBlockingQueue<E>(bufferSize);
addInfo("in start");
if(discardingThreshold == UNDEFINED)
discardingThreshold = bufferSize / 5;

worker.setDaemon(true);
worker.setName("AsyncAppender-Worker-" + worker.getName());
worker.start();
super.start();
}

@Override
public void stop() {
if(!isStarted())
return;

// mark this appender as stopped so that Worker can also stop if it is invoking aii.appendLoopOnAppenders
// and sub-appenders consume the interruption
super.stop();

// interrupt the worker thread so that it can terminate
// interruption can be consumed by sub-appenders
worker.interrupt();
try {
worker.join(1000);
} catch (InterruptedException e) {
addError("Failed to join worker thread", e); //To change body of catch statement use File | Settings | File Templates.
}
}

private int numberOfAttachedAppenders() {
int i = 0;
Iterator it = aai.iteratorForAppenders();
while(it.hasNext()) {
it.next();
i++;
}
return i;
}


@Override
protected void append(E eventObject) {
if(blockingQueue.remainingCapacity() < discardingThreshold && isDiscardable(eventObject)) {
return;
}
preprocess(eventObject);
put(eventObject);
}

private void put(E eventObject) {
try {
blockingQueue.put(eventObject);
} catch (InterruptedException e) {
}
}

public int getBufferSize() {
return bufferSize;
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

public int getDiscardingThreshold() {
return discardingThreshold;
}

public void setDiscardingThreshold(int discardingThreshold) {
this.discardingThreshold = discardingThreshold;
}

public void addAppender(Appender<E> newAppender) {
aai.addAppender(newAppender);
}

public Iterator<Appender<E>> iteratorForAppenders() {
return aai.iteratorForAppenders();
}

public Appender<E> getAppender(String name) {
return aai.getAppender(name);
}

public boolean isAttached(Appender<E> eAppender) {
return aai.isAttached(eAppender);
}

public void detachAndStopAllAppenders() {
aai.detachAndStopAllAppenders();
}

public boolean detachAppender(Appender<E> eAppender) {
return aai.detachAppender(eAppender);
}

public boolean detachAppender(String name) {
return aai.detachAppender(name);
}

class Worker extends Thread {

public void run() {
AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
AppenderAttachableImpl<E> aai = parent.aai;
while(parent.isStarted()) {
try {
E e = parent.blockingQueue.take();
aai.appendLoopOnAppenders(e);
} catch (InterruptedException ie) {
break;
}
}

addInfo("Worker thread will flush remaining events before exiting. ");
for(E e: parent.blockingQueue) {
aai.appendLoopOnAppenders(e);
}

aai.detachAndStopAllAppenders();
}
}
}
@@ -0,0 +1,136 @@
/**
* Logback: the reliable, generic, fast and flexible logging framework.
* Copyright (C) 1999-2012, QOS.ch. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
* the Eclipse Foundation
*
* or (per the licensee's choosing)
*
* under the terms of the GNU Lesser General Public License version 2.1
* as published by the Free Software Foundation.
*/
package ch.qos.logback.core;

import ch.qos.logback.core.read.ListAppender;
import ch.qos.logback.core.testUtil.DelayingListAppender;
import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.StatusChecker;
import ch.qos.logback.core.util.StatusPrinter;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class AsyncAppenderBaseTest {


Context context = new ContextBase();
AsyncAppenderBase<Integer> asyncAppenderBase = new AsyncAppenderBase<Integer>();
LossyAsyncAppender lossyAsyncAppender = new LossyAsyncAppender();
DelayingListAppender<Integer> delayingListAppender = new DelayingListAppender<Integer>();
ListAppender<Integer> listAppender = new ListAppender<Integer>();
OnConsoleStatusListener onConsoleStatusListener = new OnConsoleStatusListener();
StatusChecker statusChecker = new StatusChecker(context);

@Before
public void setUp() {
onConsoleStatusListener.setContext(context);
context.getStatusManager().add(onConsoleStatusListener);
onConsoleStatusListener.start();

asyncAppenderBase.setContext(context);
lossyAsyncAppender.setContext(context);

listAppender.setContext(context);
listAppender.setName("list");
listAppender.start();

delayingListAppender.setContext(context);
delayingListAppender.setName("list");
delayingListAppender.start();

}


@Test
public void smoke() {
asyncAppenderBase.addAppender(listAppender);
asyncAppenderBase.start();
asyncAppenderBase.doAppend(0);
asyncAppenderBase.stop();
verify(listAppender, 1);
}

@Test(timeout = 2000)
public void workerShouldStopEvenIfInterruptExceptionConsumedWithinSubappender() {
delayingListAppender.delay = 100;
asyncAppenderBase.addAppender(delayingListAppender);
asyncAppenderBase.start();
asyncAppenderBase.doAppend(0);
asyncAppenderBase.stop();
verify(delayingListAppender, 1);
assertTrue(delayingListAppender.interrupted);
}

@Test
public void noEventLoss() {
int bufferSize = 10;
int loopLen = bufferSize * 2;
asyncAppenderBase.addAppender(delayingListAppender);
asyncAppenderBase.setBufferSize(bufferSize);
asyncAppenderBase.start();
for (int i = 0; i < loopLen; i++) {
asyncAppenderBase.doAppend(i);
}
asyncAppenderBase.stop();
verify(delayingListAppender, loopLen);
}

@Test
public void lossyAppenderShouldOnlyLooseCertainEvents() {
int bufferSize = 5;
int loopLen = bufferSize * 2;
lossyAsyncAppender.addAppender(delayingListAppender);
lossyAsyncAppender.setBufferSize(bufferSize);
lossyAsyncAppender.setDiscardingThreshold(1);
lossyAsyncAppender.start();
for (int i = 0; i < loopLen; i++) {
lossyAsyncAppender.doAppend(i);
}
lossyAsyncAppender.stop();
verify(delayingListAppender, loopLen-2);
}

@Test
public void lossyAppenderShouldLooseNoneIfDiscardingThresholdIsZero() {
int bufferSize = 5;
int loopLen = bufferSize * 2;
lossyAsyncAppender.addAppender(delayingListAppender);
lossyAsyncAppender.setBufferSize(bufferSize);
lossyAsyncAppender.setDiscardingThreshold(0);
lossyAsyncAppender.start();
for (int i = 0; i < loopLen; i++) {
lossyAsyncAppender.doAppend(i);
}
lossyAsyncAppender.stop();
verify(delayingListAppender, loopLen);
}

private void verify(ListAppender la, int expectedSize) {
assertFalse(la.isStarted());
assertEquals(expectedSize, la.list.size());
assertTrue(statusChecker.isErrorFree(0));
assertTrue(statusChecker.containsMatch("Worker thread will flush remaining events before exiting."));
}

static class LossyAsyncAppender extends AsyncAppenderBase<Integer> {
@Override
protected boolean isDiscardable(Integer i) {
return (i % 3 == 0);
}
}
}
@@ -0,0 +1,34 @@
/**
* Logback: the reliable, generic, fast and flexible logging framework.
* Copyright (C) 1999-2012, QOS.ch. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
* the Eclipse Foundation
*
* or (per the licensee's choosing)
*
* under the terms of the GNU Lesser General Public License version 2.1
* as published by the Free Software Foundation.
*/

package ch.qos.logback.core.testUtil;

import ch.qos.logback.core.read.ListAppender;

public class DelayingListAppender<E> extends ListAppender<E> {

public int delay = 5;
public boolean interrupted = false;

@Override
public void append(E e) {
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
interrupted = true;
}
super.append(e);
}
}

0 comments on commit c8c59cf

Please sign in to comment.