Skip to content

Commit

Permalink
BatchLogRecordProcessor supports concurrent export
Browse files Browse the repository at this point in the history
  • Loading branch information
qixiaogang committed Apr 10, 2024
1 parent 1f80fe8 commit 044df86
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -69,6 +73,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxConcurrentExport,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
Expand All @@ -77,6 +82,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
logRecordExporter,
meterProvider,
scheduleDelayNanos,
maxConcurrentExport,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..)
Expand Down Expand Up @@ -146,6 +152,8 @@ private static final class Worker implements Runnable {

private final LogRecordExporter logRecordExporter;
private final long scheduleDelayNanos;
private final int maxConcurrentExport;
private final Semaphore semaphore;
private final int maxExportBatchSize;
private final long exporterTimeoutNanos;

Expand All @@ -163,11 +171,13 @@ private static final class Worker implements Runnable {
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final ArrayList<LogRecordData> batch;
private final ExecutorService executor;

private Worker(
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxConcurrentExport,
int maxExportBatchSize,
long exporterTimeoutNanos,
Queue<ReadWriteLogRecord> queue) {
Expand Down Expand Up @@ -211,6 +221,9 @@ private Worker(
false);

this.batch = new ArrayList<>(this.maxExportBatchSize);
this.maxConcurrentExport = maxConcurrentExport;
this.semaphore = new Semaphore(maxConcurrentExport);
this.executor = Executors.newFixedThreadPool(maxConcurrentExport);
}

private void addLog(ReadWriteLogRecord logData) {
Expand Down Expand Up @@ -278,6 +291,7 @@ private void updateNextExportTime() {
}

private CompletableResultCode shutdown() {

CompletableResultCode result = new CompletableResultCode();

CompletableResultCode flushResult = forceFlush();
Expand All @@ -295,6 +309,15 @@ private CompletableResultCode shutdown() {
});
});

for (int i = 0; i < maxConcurrentExport; i++) {
try {
semaphore.acquire();
} catch (InterruptedException e){
logger.log(Level.WARNING, "Acquire Semaphore for shutdown failed", e);
}
}
executor.shutdown();

return result;
}

Expand All @@ -317,13 +340,41 @@ private void exportCurrentBatch() {
}

try {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(batch.size(), exportedAttrs);
if (this.maxConcurrentExport == 1) {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(batch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
} else {
logger.log(Level.FINE, "Exporter failed");
try {
semaphore.acquire();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Acquire semaphore for exporter failed", e);
return;
}
ArrayList<LogRecordData> exportBatch = new ArrayList<>(batch.size());
exportBatch.addAll(batch);
Future<?> future = executor.submit(() -> {
try {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(exportBatch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(exportBatch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
} finally {
semaphore.release();
}
});
future.isDone();
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public final class BatchLogRecordProcessorBuilder {

// Visible for testing
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000;
static final int DEFAULT_MAX_CONCURRENT_EXPORT = 1;
// Visible for testing
static final int DEFAULT_MAX_QUEUE_SIZE = 2048;
// Visible for testing
Expand All @@ -30,6 +31,7 @@ public final class BatchLogRecordProcessorBuilder {

private final LogRecordExporter logRecordExporter;
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
private int maxConcurrentExport = DEFAULT_MAX_CONCURRENT_EXPORT;
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
Expand Down Expand Up @@ -64,6 +66,20 @@ long getScheduleDelayNanos() {
return scheduleDelayNanos;
}

/**
* Sets the max numbers for concurrent export. If unset, defaults to {@value
* DEFAULT_MAX_CONCURRENT_EXPORT
*/
public BatchLogRecordProcessorBuilder setMaxConcurrentExport(int maxConcurrentExport) {
checkArgument(maxConcurrentExport > 0, "maxConcurrentExport must be non-negative");
this.maxConcurrentExport = maxConcurrentExport;
return this;
}

int getMaxConcurrentExport() {
return maxConcurrentExport;
}

/**
* Sets the maximum time an export will be allowed to run before being cancelled. If unset,
* defaults to {@value DEFAULT_EXPORT_TIMEOUT_MILLIS}ms.
Expand Down Expand Up @@ -152,6 +168,7 @@ public BatchLogRecordProcessor build() {
logRecordExporter,
meterProvider,
scheduleDelayNanos,
maxConcurrentExport,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutNanos);
Expand Down

0 comments on commit 044df86

Please sign in to comment.