Skip to content

Commit

Permalink
Basic lifecycle added
Browse files Browse the repository at this point in the history
* LifeCycle interface added to some Core classes
* Operation API added to handle deferred setup tasks (#12)
* IndexTemplate failure doesn't prevent startup anymore
  • Loading branch information
rfoltyns committed Jan 28, 2019
1 parent 3d31e48 commit f6e8316
Show file tree
Hide file tree
Showing 19 changed files with 718 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ public class AsyncBatchDelivery implements BatchDelivery<String> {

private static final Logger LOG = StatusLogger.getLogger();

private volatile State state = State.STOPPED;

private final BatchOperations batchOperations;
private final BatchEmitter batchEmitter;

private final IndexTemplate indexTemplate;
private final ClientObjectFactory<Object, Object> objectFactory;

public AsyncBatchDelivery(int batchSize, int deliveryInterval, ClientObjectFactory objectFactory, FailoverPolicy failoverPolicy, IndexTemplate indexTemplate) {
this.batchOperations = objectFactory.createBatchOperations();
this.batchEmitter = createBatchEmitterServiceProvider()
Expand All @@ -52,9 +57,8 @@ public AsyncBatchDelivery(int batchSize, int deliveryInterval, ClientObjectFacto
deliveryInterval,
objectFactory,
failoverPolicy);
if (indexTemplate != null) {
objectFactory.execute(indexTemplate);
}
this.indexTemplate = indexTemplate;
this.objectFactory = objectFactory;
}

/**
Expand Down Expand Up @@ -149,4 +153,33 @@ public Builder withIndexTemplate(IndexTemplate indexTemplate) {

}

// ==========
// LIFECYCLE
// ==========

@Override
public void start() {
if (indexTemplate != null) {
objectFactory.addOperation(() -> objectFactory.execute(indexTemplate));
}
batchEmitter.start();
state = State.STARTED;
}

@Override
public void stop() {
batchEmitter.stop();
state = State.STOPPED;
}

@Override
public boolean isStarted() {
return state == State.STARTED;
}

@Override
public boolean isStopped() {
return state == State.STOPPED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
* @param <T> type of accepted items
*/
public interface BatchDelivery<T> {
public interface BatchDelivery<T> extends LifeCycle {

String ELEMENT_TYPE = "batchDelivery";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @param <T> type of accepted batch item
*/
public interface BatchEmitter<T> {
public interface BatchEmitter<T> extends LifeCycle {

/**
* @param batchItem batch item to be processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
*/
public class BulkEmitter<BATCH_TYPE> implements BatchEmitter {

private volatile State state = State.STOPPED;

private final AtomicInteger size = new AtomicInteger();

private final int maxSize;
Expand Down Expand Up @@ -98,6 +100,30 @@ public void addListener(Function<BATCH_TYPE, Boolean> onReadyListener) {
this.listener = onReadyListener;
}

// ==========
// LIFECYCLE
// ==========

@Override
public void start() {
state = State.STARTED;
}

@Override
public void stop() {
state = State.STOPPED;
}

@Override
public boolean isStarted() {
return state == State.STARTED;
}

@Override
public boolean isStopped() {
return state == State.STOPPED;
}

/*
* Class used as monitor to increase lock visibility in profiling tools
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,17 @@ public interface ClientObjectFactory<CLIENT_TYPE, BATCH_TYPE> {
/**
* Updates target with index template
* @param indexTemplate index template request
* @deprecated will be replaced by {@link #addOperation(Operation)} in future releases
*/
@Deprecated
void execute(IndexTemplate indexTemplate);

/**
* Allows to add operation to be executed before next batch. Exact time of the execution depends on implementation of this factory.
*
* NOTE: {@code default} added for backwards compatibility. {@code default} will be removed future releases
* @param operation operation to be executed
*/
default void addOperation(Operation operation) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.core.layout.AbstractLayout;
import org.apache.logging.log4j.core.layout.AbstractStringLayout;


/**
* Plugin class responsible for delivery of incoming {@link LogEvent}(s) to {@link BatchDelivery} implementation.
* <p>
Expand All @@ -51,8 +52,8 @@ public class ElasticsearchAppender extends AbstractAppender {

public static final String PLUGIN_NAME = "Elasticsearch";

private IndexNameFormatter indexNameFormatter;
private final ItemAppender<LogEvent> itemAppender;
private final IndexNameFormatter indexNameFormatter;
private final ItemAppender itemAppender;

protected ElasticsearchAppender(String name, Filter filter, AbstractLayout layout,
boolean ignoreExceptions, BatchDelivery batchDelivery, boolean messageOnly, IndexNameFormatter indexNameFormatter) {
Expand All @@ -71,6 +72,18 @@ public void append(LogEvent event) {
itemAppender.append(formattedIndexName, event);
}

@Override
public void start() {
itemAppender.start();
super.start();
}

@Override
public void stop() {
itemAppender.stop();
super.stop();
}

@PluginBuilderFactory
public static Builder newBuilder() {
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @param <T> log source type
*/
public interface ItemAppender<T> {
public interface ItemAppender<T> extends LifeCycle {

void append(String formattedIndexName, T source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
public class ItemSourceAppender implements ItemAppender<LogEvent> {

private volatile State state = State.STOPPED;

private final BatchDelivery batchDelivery;
private final Function<LogEvent, ItemSource> serializer;

Expand All @@ -48,4 +50,28 @@ public final void append(String formattedIndexName, LogEvent event) {
batchDelivery.add(formattedIndexName, serializer.apply(event));
}

@Override
public void start() {
batchDelivery.start();
state = State.STARTED;
}

@Override
public void stop() {
if (batchDelivery.isStarted()) {
batchDelivery.stop();
}
state = State.STOPPED;
}

@Override
public boolean isStarted() {
return state == State.STARTED;
}

@Override
public boolean isStopped() {
return state == State.STOPPED;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.appenders.log4j2.elasticsearch;

public interface LifeCycle {

void start();

void stop();

boolean isStarted();

boolean isStopped();

enum State {
STARTED, STOPPED
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.appenders.log4j2.elasticsearch;

public interface Operation {

void execute() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
public class StringAppender implements ItemAppender<LogEvent> {

private volatile State state = State.STOPPED;

private final BatchDelivery batchDelivery;
private final Function<LogEvent, String> serializer;

Expand All @@ -48,4 +50,32 @@ public void append(String formattedIndexName, LogEvent logEvent) {
batchDelivery.add(formattedIndexName, serializer.apply(logEvent));
}

// ==========
// LIFECYCLE
// ==========

@Override
public void start() {
batchDelivery.start();
state = State.STARTED;
}

@Override
public void stop() {
if (batchDelivery.isStarted()) {
batchDelivery.stop();
}
state = State.STOPPED;
}

@Override
public boolean isStarted() {
return state == State.STARTED;
}

@Override
public boolean isStopped() {
return state == State.STOPPED;
}

}

0 comments on commit f6e8316

Please sign in to comment.