diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseLinkedQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseLinkedQueue.java index 6939b0f7a..a99ef8a49 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseLinkedQueue.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseLinkedQueue.java @@ -13,15 +13,30 @@ */ package io.rsocket.internal.jctools.queues; -import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; -import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.UNSAFE; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.fieldOffset; import java.util.AbstractQueue; import java.util.Iterator; abstract class BaseLinkedQueuePad0 extends AbstractQueue implements MessagePassingQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + // byte b170,b171,b172,b173,b174,b175,b176,b177;//128b + // * drop 8b as object header acts as padding and is >= 8b * } // $gen:ordered-fields @@ -29,18 +44,20 @@ abstract class BaseLinkedQueueProducerNodeRef extends BaseLinkedQueuePad0 static final long P_NODE_OFFSET = fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode"); - private LinkedQueueNode producerNode; + private volatile LinkedQueueNode producerNode; final void spProducerNode(LinkedQueueNode newValue) { - producerNode = newValue; + UNSAFE.putObject(this, P_NODE_OFFSET, newValue); + } + + final void soProducerNode(LinkedQueueNode newValue) { + UNSAFE.putOrderedObject(this, P_NODE_OFFSET, newValue); } - @SuppressWarnings("unchecked") final LinkedQueueNode lvProducerNode() { - return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET); + return producerNode; } - @SuppressWarnings("unchecked") final boolean casProducerNode(LinkedQueueNode expect, LinkedQueueNode newValue) { return UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, expect, newValue); } @@ -51,8 +68,22 @@ final LinkedQueueNode lpProducerNode() { } abstract class BaseLinkedQueuePad1 extends BaseLinkedQueueProducerNodeRef { - long p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b } // $gen:ordered-fields @@ -77,16 +108,27 @@ final LinkedQueueNode lpConsumerNode() { } abstract class BaseLinkedQueuePad2 extends BaseLinkedQueueConsumerNodeRef { - long p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b } /** * A base data structure for concurrent linked queues. For convenience also pulled in common single * consumer methods since at this time there's no plan to implement MC. - * - * @param - * @author nitsanw */ abstract class BaseLinkedQueue extends BaseLinkedQueuePad2 { @@ -158,8 +200,10 @@ public final int size() { * @see MessagePassingQueue#isEmpty() */ @Override - public final boolean isEmpty() { - return lvConsumerNode() == lvProducerNode(); + public boolean isEmpty() { + LinkedQueueNode consumerNode = lvConsumerNode(); + LinkedQueueNode producerNode = lvProducerNode(); + return consumerNode == producerNode; } protected E getSingleConsumerNodeValue( diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseMpscLinkedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseMpscLinkedArrayQueue.java index 635779df3..cfad5ef71 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -13,25 +13,38 @@ */ package io.rsocket.internal.jctools.queues; -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.allocate; import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.length; -import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.modifiedCalcElementOffset; -import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; -import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.calcElementOffset; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.soElement; +import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.modifiedCalcCircularRefElementOffset; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.UNSAFE; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.fieldOffset; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.allocateRefArray; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.calcCircularRefElementOffset; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.calcRefElementOffset; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.lvRefElement; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.soRefElement; import io.rsocket.internal.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; -import io.rsocket.internal.jctools.util.PortableJvmInfo; -import io.rsocket.internal.jctools.util.Pow2; -import io.rsocket.internal.jctools.util.RangeUtil; import java.util.AbstractQueue; import java.util.Iterator; +import java.util.NoSuchElementException; abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue implements IndexedQueue { - long p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b } // $gen:ordered-fields @@ -56,8 +69,22 @@ final boolean casProducerIndex(long expect, long newValue) { } abstract class BaseMpscLinkedArrayQueuePad2 extends BaseMpscLinkedArrayQueueProducerFields { - long p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b } // $gen:ordered-fields @@ -84,8 +111,22 @@ final void soConsumerIndex(long newValue) { } abstract class BaseMpscLinkedArrayQueuePad3 extends BaseMpscLinkedArrayQueueConsumerFields { - long p0, p1, p2, p3, p4, p5, p6, p7; - long p10, p11, p12, p13, p14, p15, p16, p17; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b } // $gen:ordered-fields @@ -115,12 +156,9 @@ final void soProducerLimit(long newValue) { * An MPSC array queue which starts at initialCapacity and grows to maxCapacity in * linked chunks of the initial size. The queue grows only when the current buffer is full and * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer - * for the consumer to follow.
- * - * @param + * for the consumer to follow. */ -public abstract class BaseMpscLinkedArrayQueue - extends BaseMpscLinkedArrayQueueColdProducerFields +abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields implements MessagePassingQueue, QueueProgressIndicators { // No post padding here, subclasses must add private static final Object JUMP = new Object(); @@ -141,7 +179,7 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity) { // leave lower bit of mask clear long mask = (p2capacity - 1) << 1; // need extra element to point at next array - E[] buffer = allocate(p2capacity + 1); + E[] buffer = allocateRefArray(p2capacity + 1); producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; @@ -150,7 +188,7 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity) { } @Override - public final int size() { + public int size() { // NOTE: because indices are on even numbers we cannot use the size util. /* @@ -181,7 +219,7 @@ public final int size() { } @Override - public final boolean isEmpty() { + public boolean isEmpty() { // Order matters! // Loading consumer before producer allows for producer increments after consumer index is read. // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there @@ -240,8 +278,8 @@ public boolean offer(final E e) { } } // INDEX visible before ELEMENT - final long offset = modifiedCalcElementOffset(pIndex, mask); - soElement(buffer, offset, e); // release element e + final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask); + soRefElement(buffer, offset, e); // release element e return true; } @@ -257,8 +295,8 @@ public E poll() { final long index = lpConsumerIndex(); final long mask = consumerMask; - final long offset = modifiedCalcElementOffset(index, mask); - Object e = lvElement(buffer, offset); // LoadLoad + final long offset = modifiedCalcCircularRefElementOffset(index, mask); + Object e = lvRefElement(buffer, offset); if (e == null) { if (index != lvProducerIndex()) { // poll() == null iff queue is empty, null element is not strong enough indicator, so we @@ -266,7 +304,7 @@ public E poll() { // check the producer index. If the queue is indeed not empty we spin until element is // visible. do { - e = lvElement(buffer, offset); + e = lvRefElement(buffer, offset); } while (e == null); } else { return null; @@ -278,7 +316,7 @@ public E poll() { return newBufferPoll(nextBuffer, index); } - soElement(buffer, offset, null); // release element null + soRefElement(buffer, offset, null); // release element null soConsumerIndex(index + 2); // release cIndex return (E) e; } @@ -295,14 +333,14 @@ public E peek() { final long index = lpConsumerIndex(); final long mask = consumerMask; - final long offset = modifiedCalcElementOffset(index, mask); - Object e = lvElement(buffer, offset); // LoadLoad + final long offset = modifiedCalcCircularRefElementOffset(index, mask); + Object e = lvRefElement(buffer, offset); if (e == null && index != lvProducerIndex()) { // peek() == null iff queue is empty, null element is not strong enough indicator, so we must // check the producer index. If the queue is indeed not empty we spin until element is // visible. do { - e = lvElement(buffer, offset); + e = lvRefElement(buffer, offset); } while (e == null); } if (e == JUMP) { @@ -346,31 +384,31 @@ else if (casProducerIndex(pIndex, pIndex + 1)) { @SuppressWarnings("unchecked") private E[] nextBuffer(final E[] buffer, final long mask) { final long offset = nextArrayOffset(mask); - final E[] nextBuffer = (E[]) lvElement(buffer, offset); + final E[] nextBuffer = (E[]) lvRefElement(buffer, offset); consumerBuffer = nextBuffer; consumerMask = (length(nextBuffer) - 2) << 1; - soElement(buffer, offset, BUFFER_CONSUMED); + soRefElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } - private long nextArrayOffset(long mask) { - return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE); + private static long nextArrayOffset(long mask) { + return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE); } private E newBufferPoll(E[] nextBuffer, long index) { - final long offset = modifiedCalcElementOffset(index, consumerMask); - final E n = lvElement(nextBuffer, offset); // LoadLoad + final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask); + final E n = lvRefElement(nextBuffer, offset); if (n == null) { throw new IllegalStateException("new buffer must have at least one element"); } - soElement(nextBuffer, offset, null); // StoreStore + soRefElement(nextBuffer, offset, null); soConsumerIndex(index + 2); return n; } private E newBufferPeek(E[] nextBuffer, long index) { - final long offset = modifiedCalcElementOffset(index, consumerMask); - final E n = lvElement(nextBuffer, offset); // LoadLoad + final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask); + final E n = lvRefElement(nextBuffer, offset); if (null == n) { throw new IllegalStateException("new buffer must have at least one element"); } @@ -402,8 +440,8 @@ public E relaxedPoll() { final long index = lpConsumerIndex(); final long mask = consumerMask; - final long offset = modifiedCalcElementOffset(index, mask); - Object e = lvElement(buffer, offset); // LoadLoad + final long offset = modifiedCalcCircularRefElementOffset(index, mask); + Object e = lvRefElement(buffer, offset); if (e == null) { return null; } @@ -411,7 +449,7 @@ public E relaxedPoll() { final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } - soElement(buffer, offset, null); + soRefElement(buffer, offset, null); soConsumerIndex(index + 2); return (E) e; } @@ -423,8 +461,8 @@ public E relaxedPeek() { final long index = lpConsumerIndex(); final long mask = consumerMask; - final long offset = modifiedCalcElementOffset(index, mask); - Object e = lvElement(buffer, offset); // LoadLoad + final long offset = modifiedCalcCircularRefElementOffset(index, mask); + Object e = lvRefElement(buffer, offset); if (e == JUMP) { return newBufferPeek(nextBuffer(buffer, mask), index); } @@ -447,7 +485,11 @@ public int fill(Supplier s) { } @Override - public int fill(Supplier s, int batchSize) { + public int fill(Supplier s, int limit) { + if (null == s) throw new IllegalArgumentException("supplier is null"); + if (limit < 0) throw new IllegalArgumentException("limit is negative:" + limit); + if (limit == 0) return 0; + long mask; E[] buffer; long pIndex; @@ -471,9 +513,10 @@ public int fill(Supplier s, int batchSize) { // a successful CAS ties the ordering, lv(pIndex) -> [mask/buffer] -> cas(pIndex) // we want 'limit' slots, but will settle for whatever is visible to 'producerLimit' - long batchIndex = Math.min(producerLimit, pIndex + 2 * batchSize); + long batchIndex = + Math.min(producerLimit, pIndex + 2l * limit); // -> producerLimit >= batchIndex - if (pIndex >= producerLimit || producerLimit < batchIndex) { + if (pIndex >= producerLimit) { int result = offerSlowPath(mask, pIndex, producerLimit); switch (result) { case CONTINUE_TO_P_INDEX_CAS: @@ -496,23 +539,15 @@ public int fill(Supplier s, int batchSize) { } for (int i = 0; i < claimedSlots; i++) { - final long offset = modifiedCalcElementOffset(pIndex + 2 * i, mask); - soElement(buffer, offset, s.get()); + final long offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask); + soRefElement(buffer, offset, s.get()); } return claimedSlots; } @Override - public void fill(Supplier s, WaitStrategy w, ExitCondition exit) { - - while (exit.keepRunning()) { - if (fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0) { - int idleCounter = 0; - while (exit.keepRunning() && fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0) { - idleCounter = w.idle(idleCounter); - } - } - } + public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { + MessagePassingQueueUtil.fill(this, s, wait, exit); } @Override @@ -521,30 +556,13 @@ public int drain(Consumer c) { } @Override - public int drain(final Consumer c, final int limit) { - // Impl note: there are potentially some small gains to be had by manually inlining - // relaxedPoll() and hoisting - // reused fields out to reduce redundant reads. - int i = 0; - E m; - for (; i < limit && (m = relaxedPoll()) != null; i++) { - c.accept(m); - } - return i; + public int drain(Consumer c, int limit) { + return MessagePassingQueueUtil.drain(this, c, limit); } @Override - public void drain(Consumer c, WaitStrategy w, ExitCondition exit) { - int idleCounter = 0; - while (exit.keepRunning()) { - E e = relaxedPoll(); - if (e == null) { - idleCounter = w.idle(idleCounter); - continue; - } - idleCounter = 0; - c.accept(e); - } + public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { + MessagePassingQueueUtil.drain(this, c, wait, exit); } /** @@ -559,21 +577,28 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) { */ @Override public Iterator iterator() { - return new WeakIterator(); + return new WeakIterator(consumerBuffer, lvConsumerIndex(), lvProducerIndex()); } - private final class WeakIterator implements Iterator { - + private static class WeakIterator implements Iterator { + private final long pIndex; private long nextIndex; private E nextElement; private E[] currentBuffer; - private int currentBufferLength; + private int mask; - WeakIterator() { - setBuffer(consumerBuffer); + WeakIterator(E[] currentBuffer, long cIndex, long pIndex) { + this.pIndex = pIndex >> 1; + this.nextIndex = cIndex >> 1; + setBuffer(currentBuffer); nextElement = getNext(); } + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + @Override public boolean hasNext() { return nextElement != null; @@ -581,37 +606,54 @@ public boolean hasNext() { @Override public E next() { - E e = nextElement; + final E e = nextElement; + if (e == null) { + throw new NoSuchElementException(); + } nextElement = getNext(); return e; } private void setBuffer(E[] buffer) { this.currentBuffer = buffer; - this.currentBufferLength = length(buffer); - this.nextIndex = 0; + this.mask = length(buffer) - 2; } private E getNext() { - while (true) { - while (nextIndex < currentBufferLength - 1) { - long offset = calcElementOffset(nextIndex++); - E e = lvElement(currentBuffer, offset); - if (e != null && e != JUMP) { - return e; - } + while (nextIndex < pIndex) { + long index = nextIndex++; + E e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask)); + // skip removed/not yet visible elements + if (e == null) { + continue; } - long offset = calcElementOffset(currentBufferLength - 1); - Object nextArray = lvElement(currentBuffer, offset); - if (nextArray == BUFFER_CONSUMED) { - // Consumer may have passed us, just jump to the current consumer buffer - setBuffer(consumerBuffer); - } else if (nextArray != null) { - setBuffer((E[]) nextArray); - } else { + + // not null && not JUMP -> found next element + if (e != JUMP) { + return e; + } + + // need to jump to the next buffer + int nextBufferIndex = mask + 1; + Object nextBuffer = lvRefElement(currentBuffer, calcRefElementOffset(nextBufferIndex)); + + if (nextBuffer == BUFFER_CONSUMED || nextBuffer == null) { + // Consumer may have passed us, or the next buffer is not visible yet: drop out early return null; } + + setBuffer((E[]) nextBuffer); + // now with the new array retry the load, it can't be a JUMP, but we need to repeat same + // index + e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask)); + // skip removed/not yet visible elements + if (e == null) { + continue; + } else { + return e; + } } + return null; } } @@ -620,7 +662,7 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier s int newBufferLength = getNextBufferSize(oldBuffer); final E[] newBuffer; try { - newBuffer = allocate(newBufferLength); + newBuffer = allocateRefArray(newBufferLength); } catch (OutOfMemoryError oom) { assert lvProducerIndex() == pIndex + 1; soProducerIndex(pIndex); @@ -631,11 +673,11 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier s final int newMask = (newBufferLength - 2) << 1; producerMask = newMask; - final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask); - final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask); + final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask); + final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask); - soElement(newBuffer, offsetInNew, e == null ? s.get() : e); // element in new array - soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer); // buffer linked + soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e); // element in new array + soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer); // buffer linked // ASSERT code final long cIndex = lvConsumerIndex(); @@ -652,7 +694,7 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier s // INDEX visible before ELEMENT, consistent with consumer expectation // make resize visible to consumer - soElement(oldBuffer, offsetInOld, JUMP); + soRefElement(oldBuffer, offsetInOld, JUMP); } /** @return next buffer size(inclusive of next array pointer) */ diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/CircularArrayOffsetCalculator.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/CircularArrayOffsetCalculator.java deleted file mode 100644 index d746fccbb..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/CircularArrayOffsetCalculator.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.REF_ARRAY_BASE; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; - -import io.rsocket.internal.jctools.util.InternalAPI; - -@InternalAPI -public final class CircularArrayOffsetCalculator { - @SuppressWarnings("unchecked") - public static E[] allocate(int capacity) { - return (E[]) new Object[capacity]; - } - - /** - * @param index desirable element index - * @param mask (length - 1) - * @return the offset in bytes within the array for a given index. - */ - public static long calcElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/IndexedQueueSizeUtil.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/IndexedQueueSizeUtil.java index 1b7d43166..40116bbe1 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/IndexedQueueSizeUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/IndexedQueueSizeUtil.java @@ -13,10 +13,7 @@ */ package io.rsocket.internal.jctools.queues; -import io.rsocket.internal.jctools.util.InternalAPI; - -@InternalAPI -public final class IndexedQueueSizeUtil { +final class IndexedQueueSizeUtil { public static int size(IndexedQueue iq) { /* * It is possible for a thread to be interrupted or reschedule between the read of the producer and @@ -54,7 +51,6 @@ public static boolean isEmpty(IndexedQueue iq) { return (iq.lvConsumerIndex() == iq.lvProducerIndex()); } - @InternalAPI public interface IndexedQueue { long lvConsumerIndex(); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedArrayQueueUtil.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedArrayQueueUtil.java index 5e7831128..37651f351 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedArrayQueueUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedArrayQueueUtil.java @@ -13,13 +13,11 @@ */ package io.rsocket.internal.jctools.queues; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.REF_ARRAY_BASE; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.REF_ARRAY_BASE; +import static io.rsocket.internal.jctools.queues.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; /** This is used for method substitution in the LinkedArray classes code generation. */ final class LinkedArrayQueueUtil { - private LinkedArrayQueueUtil() {} - static int length(Object[] buf) { return buf.length; } @@ -29,7 +27,7 @@ static int length(Object[] buf) { * is compensated for by reducing the element shift. The computation is constant folded, so * there's no cost. */ - static long modifiedCalcElementOffset(long index, long mask) { + static long modifiedCalcCircularRefElementOffset(long index, long mask) { return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1)); } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedQueueNode.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedQueueNode.java index 6ea69e330..72e78bb92 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedQueueNode.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/LinkedQueueNode.java @@ -13,8 +13,8 @@ */ package io.rsocket.internal.jctools.queues; -import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; -import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.UNSAFE; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.fieldOffset; final class LinkedQueueNode { private static final long NEXT_OFFSET = fieldOffset(LinkedQueueNode.class, "next"); @@ -53,6 +53,10 @@ public void soNext(LinkedQueueNode n) { UNSAFE.putOrderedObject(this, NEXT_OFFSET, n); } + public void spNext(LinkedQueueNode n) { + UNSAFE.putObject(this, NEXT_OFFSET, n); + } + public LinkedQueueNode lvNext() { return next; } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueue.java index e0c3d0ee1..7a0fa901f 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueue.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueue.java @@ -13,55 +13,327 @@ */ package io.rsocket.internal.jctools.queues; +import java.util.Queue; + +/** + * Message passing queues are intended for concurrent method passing. A subset of {@link Queue} + * methods are provided with the same semantics, while further functionality which accomodates the + * concurrent usecase is also on offer. + * + *

Message passing queues provide happens before semantics to messages passed through, namely + * that writes made by the producer before offering the message are visible to the consuming thread + * after the message has been polled out of the queue. + * + * @param the event/message type + */ public interface MessagePassingQueue { int UNBOUNDED_CAPACITY = -1; interface Supplier { + /** + * This method will return the next value to be written to the queue. As such the queue + * implementations are commited to insert the value once the call is made. + * + *

Users should be aware that underlying queue implementations may upfront claim parts of the + * queue for batch operations and this will effect the view on the queue from the supplier + * method. In particular size and any offer methods may take the view that the full batch has + * already happened. + * + *

WARNING: this method is assumed to never throw. Breaking this assumption can lead + * to a broken queue. + * + *

WARNING: this method is assumed to never return {@code null}. Breaking this + * assumption can lead to a broken queue. + * + * @return new element, NEVER {@code null} + */ T get(); } interface Consumer { + /** + * This method will process an element already removed from the queue. This method is expected + * to never throw an exception. + * + *

Users should be aware that underlying queue implementations may upfront claim parts of the + * queue for batch operations and this will effect the view on the queue from the accept method. + * In particular size and any poll/peek methods may take the view that the full batch has + * already happened. + * + *

WARNING: this method is assumed to never throw. Breaking this assumption can lead + * to a broken queue. + * + * @param e not {@code null} + */ void accept(T e); } interface WaitStrategy { + /** + * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter + * for estimating how long the caller has been idling. The expected usage is: + * + *

+ * + *

+     * 
+     * int ic = 0;
+     * while(true) {
+     *   if(!isGodotArrived()) {
+     *     ic = w.idle(ic);
+     *     continue;
+     *   }
+     *   ic = 0;
+     *   // party with Godot until he goes again
+     * }
+     * 
+     * 
+ * + * @param idleCounter idle calls counter, managed by the idle method until reset + * @return new counter value to be used on subsequent idle cycle + */ int idle(int idleCounter); } interface ExitCondition { + /** + * This method should be implemented such that the flag read or determination cannot be hoisted + * out of a loop which notmally means a volatile load, but with JDK9 VarHandles may mean + * getOpaque. + * + * @return true as long as we should keep running + */ boolean keepRunning(); } + /** + * Called from a producer thread subject to the restrictions appropriate to the implementation and + * according to the {@link Queue#offer(Object)} interface. + * + * @param e not {@code null}, will throw NPE if it is + * @return true if element was inserted into the queue, false iff full + */ boolean offer(T e); + /** + * Called from the consumer thread subject to the restrictions appropriate to the implementation + * and according to the {@link Queue#poll()} interface. + * + * @return a message from the queue if one is available, {@code null} iff empty + */ T poll(); + /** + * Called from the consumer thread subject to the restrictions appropriate to the implementation + * and according to the {@link Queue#peek()} interface. + * + * @return a message from the queue if one is available, {@code null} iff empty + */ T peek(); + /** + * This method's accuracy is subject to concurrent modifications happening as the size is + * estimated and as such is a best effort rather than absolute value. For some implementations + * this method may be O(n) rather than O(1). + * + * @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or + * equals to capacity (if bounded). + */ int size(); + /** + * Removes all items from the queue. Called from the consumer thread subject to the restrictions + * appropriate to the implementation and according to the {@link Queue#clear()} interface. + */ void clear(); + /** + * This method's accuracy is subject to concurrent modifications happening as the observation is + * carried out. + * + * @return true if empty, false otherwise + */ boolean isEmpty(); + /** + * @return the capacity of this queue or {@link MessagePassingQueue#UNBOUNDED_CAPACITY} if not + * bounded + */ int capacity(); + /** + * Called from a producer thread subject to the restrictions appropriate to the implementation. As + * opposed to {@link Queue#offer(Object)} this method may return false without the queue being + * full. + * + * @param e not {@code null}, will throw NPE if it is + * @return true if element was inserted into the queue, false if unable to offer + */ boolean relaxedOffer(T e); + /** + * Called from the consumer thread subject to the restrictions appropriate to the implementation. + * As opposed to {@link Queue#poll()} this method may return {@code null} without the queue being + * empty. + * + * @return a message from the queue if one is available, {@code null} if unable to poll + */ T relaxedPoll(); + /** + * Called from the consumer thread subject to the restrictions appropriate to the implementation. + * As opposed to {@link Queue#peek()} this method may return {@code null} without the queue being + * empty. + * + * @return a message from the queue if one is available, {@code null} if unable to peek + */ T relaxedPeek(); - int drain(Consumer c); - - int fill(Supplier s); - + /** + * Remove up to limit elements from the queue and hand to consume. This should be + * semantically similar to: + * + *

+ * + *

{@code
+   * M m;
+   * int i = 0;
+   * for(;i < limit && (m = relaxedPoll()) != null; i++){
+   *   c.accept(m);
+   * }
+   * return i;
+   * }
+ * + *

There's no strong commitment to the queue being empty at the end of a drain. Called from a + * consumer thread subject to the restrictions appropriate to the implementation. + * + *

WARNING: Explicit assumptions are made with regards to {@link Consumer#accept} make + * sure you have read and understood these before using this method. + * + * @return the number of polled elements + * @throws IllegalArgumentException c is {@code null} + * @throws IllegalArgumentException if limit is negative + */ int drain(Consumer c, int limit); + /** + * Stuff the queue with up to limit elements from the supplier. Semantically similar to: + * + *

+ * + *

{@code
+   * for(int i=0; i < limit && relaxedOffer(s.get()); i++);
+   * }
+ * + *

There's no strong commitment to the queue being full at the end of a fill. Called from a + * producer thread subject to the restrictions appropriate to the implementation. + * + *

WARNING: Explicit assumptions are made with regards to {@link Supplier#get} make sure + * you have read and understood these before using this method. + * + * @return the number of offered elements + * @throws IllegalArgumentException s is {@code null} + * @throws IllegalArgumentException if limit is negative + */ int fill(Supplier s, int limit); + /** + * Remove all available item from the queue and hand to consume. This should be semantically + * similar to: + * + *

+   * M m;
+   * while((m = relaxedPoll()) != null){
+   * c.accept(m);
+   * }
+   * 
+ * + * There's no strong commitment to the queue being empty at the end of a drain. Called from a + * consumer thread subject to the restrictions appropriate to the implementation. + * + *

WARNING: Explicit assumptions are made with regards to {@link Consumer#accept} make + * sure you have read and understood these before using this method. + * + * @return the number of polled elements + * @throws IllegalArgumentException c is {@code null} + */ + int drain(Consumer c); + + /** + * Stuff the queue with elements from the supplier. Semantically similar to: + * + *

+   * while(relaxedOffer(s.get());
+   * 
+ * + * There's no strong commitment to the queue being full at the end of a fill. Called from a + * producer thread subject to the restrictions appropriate to the implementation. + * + *

Unbounded queues will fill up the queue with a fixed amount rather than fill up to oblivion. + * + *

WARNING: Explicit assumptions are made with regards to {@link Supplier#get} make sure + * you have read and understood these before using this method. + * + * @return the number of offered elements + * @throws IllegalArgumentException s is {@code null} + */ + int fill(Supplier s); + + /** + * Remove elements from the queue and hand to consume forever. Semantically similar to: + * + *

+ * + *

+   *  int idleCounter = 0;
+   *  while (exit.keepRunning()) {
+   *      E e = relaxedPoll();
+   *      if(e==null){
+   *          idleCounter = wait.idle(idleCounter);
+   *          continue;
+   *      }
+   *      idleCounter = 0;
+   *      c.accept(e);
+   *  }
+   * 
+ * + *

Called from a consumer thread subject to the restrictions appropriate to the implementation. + * + *

WARNING: Explicit assumptions are made with regards to {@link Consumer#accept} make + * sure you have read and understood these before using this method. + * + * @throws IllegalArgumentException c OR wait OR exit are {@code null} + */ void drain(Consumer c, WaitStrategy wait, ExitCondition exit); + /** + * Stuff the queue with elements from the supplier forever. Semantically similar to: + * + *

+ * + *

+   * 
+   *  int idleCounter = 0;
+   *  while (exit.keepRunning()) {
+   *      E e = s.get();
+   *      while (!relaxedOffer(e)) {
+   *          idleCounter = wait.idle(idleCounter);
+   *          continue;
+   *      }
+   *      idleCounter = 0;
+   *  }
+   * 
+   * 
+ * + *

Called from a producer thread subject to the restrictions appropriate to the implementation. + * The main difference being that implementors MUST assure room in the queue is available BEFORE + * calling {@link Supplier#get}. + * + *

WARNING: Explicit assumptions are made with regards to {@link Supplier#get} make sure + * you have read and understood these before using this method. + * + * @throws IllegalArgumentException s OR wait OR exit are {@code null} + */ void fill(Supplier s, WaitStrategy wait, ExitCondition exit); } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueueUtil.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueueUtil.java new file mode 100644 index 000000000..cb03364d8 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MessagePassingQueueUtil.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.internal.jctools.queues; + +import io.rsocket.internal.jctools.queues.MessagePassingQueue.Consumer; +import io.rsocket.internal.jctools.queues.MessagePassingQueue.ExitCondition; +import io.rsocket.internal.jctools.queues.MessagePassingQueue.Supplier; +import io.rsocket.internal.jctools.queues.MessagePassingQueue.WaitStrategy; + +final class MessagePassingQueueUtil { + public static int drain(MessagePassingQueue queue, Consumer c, int limit) { + if (null == c) throw new IllegalArgumentException("c is null"); + if (limit < 0) throw new IllegalArgumentException("limit is negative: " + limit); + if (limit == 0) return 0; + E e; + int i = 0; + for (; i < limit && (e = queue.relaxedPoll()) != null; i++) { + c.accept(e); + } + return i; + } + + public static int drain(MessagePassingQueue queue, Consumer c) { + if (null == c) throw new IllegalArgumentException("c is null"); + E e; + int i = 0; + while ((e = queue.relaxedPoll()) != null) { + i++; + c.accept(e); + } + return i; + } + + public static void drain( + MessagePassingQueue queue, Consumer c, WaitStrategy wait, ExitCondition exit) { + if (null == c) throw new IllegalArgumentException("c is null"); + if (null == wait) throw new IllegalArgumentException("wait is null"); + if (null == exit) throw new IllegalArgumentException("exit condition is null"); + + int idleCounter = 0; + while (exit.keepRunning()) { + final E e = queue.relaxedPoll(); + if (e == null) { + idleCounter = wait.idle(idleCounter); + continue; + } + idleCounter = 0; + c.accept(e); + } + } + + public static void fill( + MessagePassingQueue q, Supplier s, WaitStrategy wait, ExitCondition exit) { + if (null == wait) throw new IllegalArgumentException("waiter is null"); + if (null == exit) throw new IllegalArgumentException("exit condition is null"); + + int idleCounter = 0; + while (exit.keepRunning()) { + if (q.fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0) { + idleCounter = wait.idle(idleCounter); + continue; + } + idleCounter = 0; + } + } + + public static int fillBounded(MessagePassingQueue q, Supplier s) { + return fillInBatchesToLimit(q, s, PortableJvmInfo.RECOMENDED_OFFER_BATCH, q.capacity()); + } + + public static int fillInBatchesToLimit( + MessagePassingQueue q, Supplier s, int batch, int limit) { + long result = + 0; // result is a long because we want to have a safepoint check at regular intervals + do { + final int filled = q.fill(s, batch); + if (filled == 0) { + return (int) result; + } + result += filled; + } while (result <= limit); + return (int) result; + } + + public static int fillUnbounded(MessagePassingQueue q, Supplier s) { + return fillInBatchesToLimit(q, s, PortableJvmInfo.RECOMENDED_OFFER_BATCH, 4096); + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MpscUnboundedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MpscUnboundedArrayQueue.java index 59eab33a1..179070be4 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MpscUnboundedArrayQueue.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/MpscUnboundedArrayQueue.java @@ -14,20 +14,31 @@ package io.rsocket.internal.jctools.queues; import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.length; - -import io.rsocket.internal.jctools.util.PortableJvmInfo; +import static io.rsocket.internal.jctools.queues.MessagePassingQueueUtil.fillUnbounded; /** * An MPSC array queue which starts at initialCapacity and grows indefinitely in linked * chunks of the initial size. The queue grows only when the current chunk is full and elements are * not copied on resize, instead a link to the new chunk is stored in the old chunk for the consumer - * to follow.
- * - * @param + * to follow. */ public class MpscUnboundedArrayQueue extends BaseMpscLinkedArrayQueue { - long p0, p1, p2, p3, p4, p5, p6, p7; - long p10, p11, p12, p13, p14, p15, p16, p17; + byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b + byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b + byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b + byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b + byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b + byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b + byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b + byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b + byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b + byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b + byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b + byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b + byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b + byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b + byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b + byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b public MpscUnboundedArrayQueue(int chunkSize) { super(chunkSize); @@ -50,17 +61,7 @@ public int drain(Consumer c) { @Override public int fill(Supplier s) { - long result = - 0; // result is a long because we want to have a safepoint check at regular intervals - final int capacity = 4096; - do { - final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); - if (filled == 0) { - return (int) result; - } - result += filled; - } while (result <= capacity); - return (int) result; + return fillUnbounded(this, s); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/PortableJvmInfo.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/PortableJvmInfo.java similarity index 90% rename from rsocket-core/src/main/java/io/rsocket/internal/jctools/util/PortableJvmInfo.java rename to rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/PortableJvmInfo.java index 2d567d60d..f037857e8 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/PortableJvmInfo.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/PortableJvmInfo.java @@ -11,11 +11,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.rsocket.internal.jctools.util; +package io.rsocket.internal.jctools.queues; /** JVM Information that is standard and available on all JVMs (i.e. does not use unsafe) */ -@InternalAPI -public interface PortableJvmInfo { +interface PortableJvmInfo { int CACHE_LINE_SIZE = Integer.getInteger("jctools.cacheLineSize", 64); int CPUs = Runtime.getRuntime().availableProcessors(); int RECOMENDED_OFFER_BATCH = CPUs * 4; diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/Pow2.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/Pow2.java similarity index 96% rename from rsocket-core/src/main/java/io/rsocket/internal/jctools/util/Pow2.java rename to rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/Pow2.java index d8c66d89e..282a22f02 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/Pow2.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/Pow2.java @@ -11,11 +11,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.rsocket.internal.jctools.util; +package io.rsocket.internal.jctools.queues; /** Power of 2 utility functions. */ -@InternalAPI -public final class Pow2 { +final class Pow2 { public static final int MAX_POW2 = 1 << 30; /** diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/RangeUtil.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/RangeUtil.java similarity index 94% rename from rsocket-core/src/main/java/io/rsocket/internal/jctools/util/RangeUtil.java rename to rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/RangeUtil.java index 77a0582ca..3adcb2f3c 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/RangeUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/RangeUtil.java @@ -11,10 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.rsocket.internal.jctools.util; +package io.rsocket.internal.jctools.queues; -@InternalAPI -public final class RangeUtil { +final class RangeUtil { public static long checkPositive(long n, String name) { if (n <= 0) { throw new IllegalArgumentException(name + ": " + n + " (expected: > 0)"); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/UnsafeAccess.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/UnsafeAccess.java old mode 100755 new mode 100644 similarity index 71% rename from rsocket-core/src/main/java/io/rsocket/internal/jctools/util/UnsafeAccess.java rename to rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/UnsafeAccess.java index 793e64505..c99aeb689 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/UnsafeAccess.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/UnsafeAccess.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.rsocket.internal.jctools.util; +package io.rsocket.internal.jctools.queues; import java.lang.reflect.Constructor; import java.lang.reflect.Field; @@ -33,41 +33,56 @@ * * @author nitsanw */ -@InternalAPI -public class UnsafeAccess { - public static final boolean SUPPORTS_GET_AND_SET; +class UnsafeAccess { + public static final boolean SUPPORTS_GET_AND_SET_REF; + public static final boolean SUPPORTS_GET_AND_ADD_LONG; public static final Unsafe UNSAFE; static { + UNSAFE = getUnsafe(); + SUPPORTS_GET_AND_SET_REF = hasGetAndSetSupport(); + SUPPORTS_GET_AND_ADD_LONG = hasGetAndAddLongSupport(); + } + + private static Unsafe getUnsafe() { Unsafe instance; try { final Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); instance = (Unsafe) field.get(null); } catch (Exception ignored) { - // Some platforms, notably Android, might not have a sun.misc.Unsafe - // implementation with a private `theUnsafe` static instance. In this - // case we can try and call the default constructor, which proves - // sufficient for Android usage. + // Some platforms, notably Android, might not have a sun.misc.Unsafe implementation with a + // private + // `theUnsafe` static instance. In this case we can try to call the default constructor, which + // is sufficient + // for Android usage. try { Constructor c = Unsafe.class.getDeclaredConstructor(); c.setAccessible(true); instance = c.newInstance(); } catch (Exception e) { - SUPPORTS_GET_AND_SET = false; throw new RuntimeException(e); } } + return instance; + } - boolean getAndSetSupport = false; + private static boolean hasGetAndSetSupport() { try { Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE, Object.class); - getAndSetSupport = true; + return true; } catch (Exception ignored) { } + return false; + } - UNSAFE = instance; - SUPPORTS_GET_AND_SET = getAndSetSupport; + private static boolean hasGetAndAddLongSupport() { + try { + Unsafe.class.getMethod("getAndAddLong", Object.class, Long.TYPE, Long.TYPE); + return true; + } catch (Exception ignored) { + } + return false; } public static long fieldOffset(Class clz, String fieldName) throws RuntimeException { diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/UnsafeRefArrayAccess.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/UnsafeRefArrayAccess.java similarity index 57% rename from rsocket-core/src/main/java/io/rsocket/internal/jctools/util/UnsafeRefArrayAccess.java rename to rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/UnsafeRefArrayAccess.java index d8309c5c5..c734a9914 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/UnsafeRefArrayAccess.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/UnsafeRefArrayAccess.java @@ -11,32 +11,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.rsocket.internal.jctools.util; +package io.rsocket.internal.jctools.queues; -import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; +import static io.rsocket.internal.jctools.queues.UnsafeAccess.UNSAFE; -/** - * A concurrent access enabling class used by circular array based queues this class exposes an - * offset computation method along with differently memory fenced load/store methods into the - * underlying array. The class is pre-padded and the array is padded on either side to help with - * False sharing prvention. It is expected theat subclasses handle post padding. - * - *

Offset calculation is separate from access to enable the reuse of a give compute offset. - * - *

Load/Store methods using a buffer parameter are provided to allow the prevention of - * final field reload after a LoadLoad barrier. - * - *

- * - * @author nitsanw - */ -@InternalAPI -public final class UnsafeRefArrayAccess { +final class UnsafeRefArrayAccess { public static final long REF_ARRAY_BASE; public static final int REF_ELEMENT_SHIFT; static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); + final int scale = UNSAFE.arrayIndexScale(Object[].class); if (4 == scale) { REF_ELEMENT_SHIFT = 2; } else if (8 == scale) { @@ -44,28 +28,28 @@ public final class UnsafeRefArrayAccess { } else { throw new IllegalStateException("Unknown pointer size: " + scale); } - REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class); + REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class); } /** * A plain store (no ordering/fences) of an element to a given offset * * @param buffer this.buffer - * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @param offset computed via {@link UnsafeRefArrayAccess#calcRefElementOffset(long)} * @param e an orderly kitty */ - public static void spElement(E[] buffer, long offset, E e) { + public static void spRefElement(E[] buffer, long offset, E e) { UNSAFE.putObject(buffer, offset, e); } /** - * An ordered store(store + StoreStore barrier) of an element to a given offset + * An ordered store of an element to a given offset * * @param buffer this.buffer - * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset} + * @param offset computed via {@link UnsafeRefArrayAccess#calcCircularRefElementOffset} * @param e an orderly kitty */ - public static void soElement(E[] buffer, long offset, E e) { + public static void soRefElement(E[] buffer, long offset, E e) { UNSAFE.putOrderedObject(buffer, offset, e); } @@ -73,31 +57,48 @@ public static void soElement(E[] buffer, long offset, E e) { * A plain load (no ordering/fences) of an element from a given offset. * * @param buffer this.buffer - * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @param offset computed via {@link UnsafeRefArrayAccess#calcRefElementOffset(long)} * @return the element at the offset */ @SuppressWarnings("unchecked") - public static E lpElement(E[] buffer, long offset) { + public static E lpRefElement(E[] buffer, long offset) { return (E) UNSAFE.getObject(buffer, offset); } /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. + * A volatile load of an element from a given offset. * * @param buffer this.buffer - * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @param offset computed via {@link UnsafeRefArrayAccess#calcRefElementOffset(long)} * @return the element at the offset */ @SuppressWarnings("unchecked") - public static E lvElement(E[] buffer, long offset) { + public static E lvRefElement(E[] buffer, long offset) { return (E) UNSAFE.getObjectVolatile(buffer, offset); } /** * @param index desirable element index - * @return the offset in bytes within the array for a given index. + * @return the offset in bytes within the array for a given index */ - public static long calcElementOffset(long index) { + public static long calcRefElementOffset(long index) { return REF_ARRAY_BASE + (index << REF_ELEMENT_SHIFT); } + + /** + * Note: circular arrays are assumed a power of 2 in length and the `mask` is (length - 1). + * + * @param index desirable element index + * @param mask (length - 1) + * @return the offset in bytes within the circular array for a given index + */ + public static long calcCircularRefElementOffset(long index, long mask) { + return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); + } + + /** This makes for an easier time generating the atomic queues, and removes some warnings. */ + @SuppressWarnings("unchecked") + public static E[] allocateRefArray(int capacity) { + return (E[]) new Object[capacity]; + } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/InternalAPI.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/InternalAPI.java deleted file mode 100644 index f233e9597..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/util/InternalAPI.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.util; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * This annotation marks classes and methods which may be public for any reason (to support better - * testing or reduce code duplication) but are not intended as public API and may change between - * releases without the change being considered a breaking API change (a major release). - */ -@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR}) -@Retention(RetentionPolicy.SOURCE) -public @interface InternalAPI {}