diff --git a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java index 7a5473e8dea..9609f8cb4d3 100644 --- a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java @@ -19,7 +19,6 @@ import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; -import io.opentelemetry.sdk.trace.internal.JcTools; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -27,7 +26,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -140,7 +138,8 @@ void configureBatchLogRecordProcessor() { assertThat(worker) .extracting("queue") .isInstanceOfSatisfying( - Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2)); + ArrayBlockingQueue.class, + queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); assertThat(worker) .extracting("logRecordExporter") .isInstanceOf(SystemOutLogRecordExporter.class); diff --git a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java index fa009ebdc0f..1ea3a6d8778 100644 --- a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java +++ b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java @@ -5,15 +5,10 @@ package io.opentelemetry.sdk.trace.internal; -import java.util.Objects; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.logging.Level; -import java.util.logging.Logger; import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.atomic.MpscAtomicArrayQueue; /** * Internal accessor of JCTools package for fast queues. @@ -23,26 +18,11 @@ */ public final class JcTools { - private static final AtomicBoolean queueCreationWarningLogged = new AtomicBoolean(); - private static final Logger logger = Logger.getLogger(JcTools.class.getName()); - /** * Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer. */ public static Queue newFixedSizeQueue(int capacity) { - try { - return new MpscArrayQueue<>(capacity); - } catch (java.lang.NoClassDefFoundError | java.lang.ExceptionInInitializerError e) { - if (!queueCreationWarningLogged.getAndSet(true)) { - logger.log( - Level.WARNING, - "Cannot create high-performance queue, reverting to ArrayBlockingQueue ({0})", - Objects.toString(e, "unknown cause")); - } - // Happens when modules such as jdk.unsupported are disabled in a custom JRE distribution, - // or a security manager preventing access to Unsafe is installed. - return new ArrayBlockingQueue<>(capacity); - } + return new MpscAtomicArrayQueue<>(capacity); } /** @@ -50,11 +30,7 @@ public static Queue newFixedSizeQueue(int capacity) { * to use the shaded classes. */ public static long capacity(Queue queue) { - if (queue instanceof MessagePassingQueue) { - return ((MessagePassingQueue) queue).capacity(); - } else { - return (long) ((ArrayBlockingQueue) queue).remainingCapacity() + queue.size(); - } + return ((MessagePassingQueue) queue).capacity(); } /** @@ -65,22 +41,7 @@ public static long capacity(Queue queue) { */ @SuppressWarnings("unchecked") public static int drain(Queue queue, int limit, Consumer consumer) { - if (queue instanceof MessagePassingQueue) { - return ((MessagePassingQueue) queue).drain(consumer::accept, limit); - } else { - return drainNonJcQueue(queue, limit, consumer); - } - } - - private static int drainNonJcQueue( - Queue queue, int maxExportBatchSize, Consumer consumer) { - int polledCount = 0; - T item; - while (polledCount < maxExportBatchSize && (item = queue.poll()) != null) { - consumer.accept(item); - ++polledCount; - } - return polledCount; + return ((MessagePassingQueue) queue).drain(consumer::accept, limit); } private JcTools() {} diff --git a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java index eb2bd9199e3..2aad0ee64e7 100644 --- a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java +++ b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java @@ -11,7 +11,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; +import org.jctools.queues.atomic.MpscAtomicArrayQueue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledOnJre; import org.junit.jupiter.api.condition.JRE; @@ -30,7 +30,7 @@ void newFixedSizeQueue_SunMiscProhibited() { Queue queue = AccessController.doPrivileged( (PrivilegedAction>) () -> JcTools.newFixedSizeQueue(10)); - assertThat(queue).isInstanceOf(ArrayBlockingQueue.class); + assertThat(queue).isInstanceOf(MpscAtomicArrayQueue.class); } finally { System.setSecurityManager(null); } diff --git a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java index edb0168d2ce..1fad53021da 100644 --- a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java +++ b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java @@ -9,41 +9,18 @@ import java.util.ArrayList; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MessagePassingQueue; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.LENIENT) class JcToolsTest { ArrayList batch = new ArrayList<>(10); - @Test - void drain_ArrayBlockingQueue() { - // Arrange - batch.add("Test3"); - Queue queue = new ArrayBlockingQueue<>(10); - queue.add("Test1"); - queue.add("Test2"); - - // Act - JcTools.drain(queue, 5, batch::add); - - // Assert - assertThat(batch).hasSize(3); - assertThat(queue).hasSize(0); - } - @Test void drain_MessagePassingQueue() { // Arrange batch.add("Test3"); - Queue queue = new MpscArrayQueue<>(10); + Queue queue = JcTools.newFixedSizeQueue(10); queue.add("Test1"); queue.add("Test2"); @@ -58,7 +35,7 @@ void drain_MessagePassingQueue() { @Test void drain_MaxBatch() { // Arrange - Queue queue = new MpscArrayQueue<>(10); + Queue queue = JcTools.newFixedSizeQueue(10); queue.add("Test1"); queue.add("Test2"); @@ -79,7 +56,7 @@ void newFixedSize_MpscQueue() { Queue objects = JcTools.newFixedSizeQueue(capacity); // Assert - assertThat(objects).isInstanceOf(MpscArrayQueue.class); + assertThat(objects).isInstanceOf(MessagePassingQueue.class); } @Test @@ -94,16 +71,4 @@ void capacity_MpscQueue() { // Assert assertThat(queueSize).isGreaterThan(capacity); } - - @Test - void capacity_ArrayBlockingQueue() { - // Arrange - Queue queue = new ArrayBlockingQueue<>(10); - - // Act - long queueSize = JcTools.capacity(queue); - - // Assert - assertThat(queueSize).isEqualTo(10); - } }