Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.trace.internal.JcTools;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -140,7 +138,8 @@ void configureBatchLogRecordProcessor() {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2));
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2));
Copy link
Member Author

@trask trask Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchLogRecordProcessor uses ArrayBlockingQueue, so this test abstraction wasn't needed (probably a copy paste from similar test for BatchSpanProcessor):

new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..)

assertThat(worker)
.extracting("logRecordExporter")
.isInstanceOf(SystemOutLogRecordExporter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@

package io.opentelemetry.sdk.trace.internal;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;
import org.jctools.queues.atomic.MpscAtomicArrayQueue;

/**
* Internal accessor of JCTools package for fast queues.
Expand All @@ -23,38 +18,19 @@
*/
public final class JcTools {

private static final AtomicBoolean queueCreationWarningLogged = new AtomicBoolean();
private static final Logger logger = Logger.getLogger(JcTools.class.getName());

/**
* Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer.
*/
public static <T> Queue<T> newFixedSizeQueue(int capacity) {
try {
return new MpscArrayQueue<>(capacity);
} catch (java.lang.NoClassDefFoundError | java.lang.ExceptionInInitializerError e) {
if (!queueCreationWarningLogged.getAndSet(true)) {
logger.log(
Level.WARNING,
"Cannot create high-performance queue, reverting to ArrayBlockingQueue ({0})",
Objects.toString(e, "unknown cause"));
}
// Happens when modules such as jdk.unsupported are disabled in a custom JRE distribution,
// or a security manager preventing access to Unsafe is installed.
return new ArrayBlockingQueue<>(capacity);
}
return new MpscAtomicArrayQueue<>(capacity);
Copy link
Member Author

@trask trask Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key insight here was from JCTools/JCTools#395 (comment):

Users can use atomic queues as an Unsafe free alternative (where possible)

I originally only used this implementation on Java 22+, just to avoid triggering the Unsafe warning

but given that the benchmarks look fine, I think it would be ok to go straight to this implementation in all cases and simplify things

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between MpscArrayQueue and MpscAtomicArrayQueue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
* Returns the capacity of the {@link Queue}. We cast to the implementation so callers do not need
* to use the shaded classes.
*/
public static long capacity(Queue<?> queue) {
if (queue instanceof MessagePassingQueue) {
return ((MessagePassingQueue<?>) queue).capacity();
} else {
return (long) ((ArrayBlockingQueue<?>) queue).remainingCapacity() + queue.size();
}
return ((MessagePassingQueue<?>) queue).capacity();
}

/**
Expand All @@ -65,22 +41,7 @@ public static long capacity(Queue<?> queue) {
*/
@SuppressWarnings("unchecked")
public static <T> int drain(Queue<T> queue, int limit, Consumer<T> consumer) {
if (queue instanceof MessagePassingQueue) {
return ((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
} else {
return drainNonJcQueue(queue, limit, consumer);
}
}

private static <T> int drainNonJcQueue(
Queue<T> queue, int maxExportBatchSize, Consumer<T> consumer) {
int polledCount = 0;
T item;
while (polledCount < maxExportBatchSize && (item = queue.poll()) != null) {
consumer.accept(item);
++polledCount;
}
return polledCount;
return ((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
}

private JcTools() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.jctools.queues.atomic.MpscAtomicArrayQueue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnJre;
import org.junit.jupiter.api.condition.JRE;
Expand All @@ -30,7 +30,7 @@ void newFixedSizeQueue_SunMiscProhibited() {
Queue<Object> queue =
AccessController.doPrivileged(
(PrivilegedAction<Queue<Object>>) () -> JcTools.newFixedSizeQueue(10));
assertThat(queue).isInstanceOf(ArrayBlockingQueue.class);
assertThat(queue).isInstanceOf(MpscAtomicArrayQueue.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was going to say that this test is no longer relevant. But I suppose its still good to assert htat MpscAtomicArrayQueue doesn't rely on unsafe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, though I think we could revisit and probably delete it after #7683

} finally {
System.setSecurityManager(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,18 @@

import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.jctools.queues.MpscArrayQueue;
import org.jctools.queues.MessagePassingQueue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class JcToolsTest {

ArrayList<String> batch = new ArrayList<>(10);

@Test
void drain_ArrayBlockingQueue() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to test ArrayBlockingQueue anymore with this change

// Arrange
batch.add("Test3");
Queue<String> queue = new ArrayBlockingQueue<>(10);
queue.add("Test1");
queue.add("Test2");

// Act
JcTools.drain(queue, 5, batch::add);

// Assert
assertThat(batch).hasSize(3);
assertThat(queue).hasSize(0);
}

@Test
void drain_MessagePassingQueue() {
// Arrange
batch.add("Test3");
Queue<String> queue = new MpscArrayQueue<>(10);
Queue<String> queue = JcTools.newFixedSizeQueue(10);
queue.add("Test1");
queue.add("Test2");

Expand All @@ -58,7 +35,7 @@ void drain_MessagePassingQueue() {
@Test
void drain_MaxBatch() {
// Arrange
Queue<String> queue = new MpscArrayQueue<>(10);
Queue<String> queue = JcTools.newFixedSizeQueue(10);
queue.add("Test1");
queue.add("Test2");

Expand All @@ -79,7 +56,7 @@ void newFixedSize_MpscQueue() {
Queue<Object> objects = JcTools.newFixedSizeQueue(capacity);

// Assert
assertThat(objects).isInstanceOf(MpscArrayQueue.class);
assertThat(objects).isInstanceOf(MessagePassingQueue.class);
}

@Test
Expand All @@ -94,16 +71,4 @@ void capacity_MpscQueue() {
// Assert
assertThat(queueSize).isGreaterThan(capacity);
}

@Test
void capacity_ArrayBlockingQueue() {
// Arrange
Queue<String> queue = new ArrayBlockingQueue<>(10);

// Act
long queueSize = JcTools.capacity(queue);

// Assert
assertThat(queueSize).isEqualTo(10);
}
}
Loading