Skip to content

Commit

Permalink
use readwritelock
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed Nov 17, 2023
1 parent 54fe766 commit de107e5
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.logback.appender.v1_0.internal.LoggingEventMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.ILoggerFactory;
import org.slf4j.LoggerFactory;
Expand All @@ -37,6 +42,8 @@ public class OpenTelemetryAppender extends UnsynchronizedAppenderBase<ILoggingEv
private BlockingQueue<LoggingEventToReplay> eventsToReplay = new ArrayBlockingQueue<>(1000);
private final AtomicBoolean replayLimitWarningLogged = new AtomicBoolean();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public OpenTelemetryAppender() {}

/**
Expand Down Expand Up @@ -71,35 +78,32 @@ public void start() {
captureMarkerAttribute,
captureKeyValuePairAttributes,
captureLoggerContext);
if (openTelemetry == null) {
openTelemetry = OpenTelemetry.noop();
}
super.start();
}

@SuppressWarnings("SystemOut")
@Override
protected void append(ILoggingEvent event) {
// TODO(jean): Race condition to fix
// time=1 append() thread gets the no-op instance
// time=2 install() thread updates the instance and flushes the queue
// time=3 append() thread adds the log event to the queue
if (openTelemetry == OpenTelemetry.noop()) {
if (eventsToReplay.remainingCapacity() > 0) {
LoggingEventToReplay logEventToReplay =
new LoggingEventToReplay(event, captureExperimentalAttributes, captureCodeAttributes);
eventsToReplay.offer(logEventToReplay);
} else if (!replayLimitWarningLogged.getAndSet(true)) {
Lock readLock = lock.readLock();
readLock.lock();
try {
OpenTelemetry openTelemetry = this.openTelemetry;
if (openTelemetry != null) {
emit(openTelemetry, event);
return;
}

LoggingEventToReplay logEventToReplay =
new LoggingEventToReplay(event, captureExperimentalAttributes, captureCodeAttributes);

if (!eventsToReplay.offer(logEventToReplay) && !replayLimitWarningLogged.getAndSet(true)) {
String message =
"Log cache size of the OpenTelemetry appender is too small. firstLogsCacheSize value has to be increased;";
System.err.println(message);
}
return;
} finally {
readLock.unlock();
}
mapper.emit(
openTelemetry.getLogsBridge(),
new LoggingEventToReplay(event, captureExperimentalAttributes, captureCodeAttributes),
-1);
}

/**
Expand Down Expand Up @@ -173,13 +177,26 @@ public void setNumLogsCapturedBeforeOtelInstall(int size) {
* to function. See {@link #install(OpenTelemetry)} for simple installation option.
*/
public void setOpenTelemetry(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
LoggingEventToReplay eventToReplay;
while ((eventToReplay = eventsToReplay.poll()) != null) {
mapper.emit(openTelemetry.getLogsBridge(), eventToReplay, -1);
List<LoggingEventToReplay> eventsToReplay = new ArrayList<>();
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
// minimize scope of write lock
this.openTelemetry = openTelemetry;
this.eventsToReplay.drainTo(eventsToReplay);
} finally {
writeLock.unlock();
}
// now emit
for (LoggingEventToReplay eventToReplay : eventsToReplay) {
emit(openTelemetry, eventToReplay);
}
}

private void emit(OpenTelemetry openTelemetry, ILoggingEvent event) {
mapper.emit(openTelemetry.getLogsBridge(), event, -1);
}

// copied from SDK's DefaultConfigProperties
private static List<String> filterBlanksAndNulls(String[] values) {
return Arrays.stream(values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static void setupAll() {
resetLoggerContext();
}

private static void resetLoggerContext() {
static void resetLoggerContext() {
try {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
Field field = ContextBase.class.getDeclaredField("propertyMap");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,34 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.OpenTelemetry;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.util.ContextInitializer;
import ch.qos.logback.core.joran.spi.JoranException;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.net.URL;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

class LogReplayOpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTest {

@BeforeEach
void setup() {
void setup() throws JoranException {
generalBeforeEachSetup();
OpenTelemetryAppender.install(OpenTelemetry.noop());
// to make sure we start fresh with a new OpenTelemetryAppender for each test
reloadLoggerConfiguration();
}

private static void reloadLoggerConfiguration() throws JoranException {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
ContextInitializer ci = new ContextInitializer(loggerContext);
URL url = ci.findURLOfDefaultConfigurationFile(true);
loggerContext.reset();
ci.configureByResource(url);
// by default LoggerContext contains HOSTNAME property we clear it to start with empty context
resetLoggerContext();
}

@Override
Expand Down

0 comments on commit de107e5

Please sign in to comment.