From c4a2762b73283ddc2662dd6bd29c8ced7e83a0bd Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Sep 2015 20:11:00 -0700 Subject: [PATCH 1/5] Pillage Concat from RxJava v2 RxJava v2 isn't ready to depend on, so this pillages the necessary code for OperatorConcatMap to work which I'll adjust to work for the fragmentation requirements. --- .../rx/AppendOnlyLinkedArrayList.java | 125 ++++++++ .../internal/rx/BaseArrayQueue.java | 131 ++++++++ .../internal/rx/BaseLinkedQueue.java | 94 ++++++ .../internal/rx/LinkedQueueNode.java | 58 ++++ .../internal/rx/MpscLinkedQueue.java | 112 +++++++ .../internal/rx/NotificationLite.java | 207 +++++++++++++ .../internal/rx/OperatorConcatMap.java | 202 +++++++++++++ .../io/reactivesocket/internal/rx/Pow2.java | 46 +++ .../internal/rx/QueueDrainHelper.java | 280 ++++++++++++++++++ .../io/reactivesocket/internal/rx/README.md | 3 + .../internal/rx/SerializedSubscriber.java | 176 +++++++++++ .../internal/rx/SpscArrayQueue.java | 133 +++++++++ .../internal/rx/SpscExactArrayQueue.java | 164 ++++++++++ .../internal/rx/SubscriptionArbiter.java | 191 ++++++++++++ 14 files changed, 1922 insertions(+) create mode 100644 src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/NotificationLite.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/Pow2.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/README.md create mode 100644 src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java create mode 100644 src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java diff --git a/src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java b/src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java new file mode 100644 index 000000000..0b1ee24b7 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java @@ -0,0 +1,125 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal.rx; + +import java.util.function.*; + +/** + * A linked-array-list implementation that only supports appending and consumption. + * + * @param the value type + */ +public class AppendOnlyLinkedArrayList { + final int capacity; + Object[] head; + Object[] tail; + int offset; + + /** + * Constructs an empty list with a per-link capacity + * @param capacity the capacity of each link + */ + public AppendOnlyLinkedArrayList(int capacity) { + this.capacity = capacity; + this.head = new Object[capacity + 1]; + this.tail = head; + } + + /** + * Append a non-null value to the list. + *

Don't add null to the list! + * @param value the value to append + */ + public void add(T value) { + final int c = capacity; + int o = offset; + if (o == c) { + Object[] next = new Object[c + 1]; + tail[c] = next; + tail = next; + o = 0; + } + tail[o] = value; + offset = o + 1; + } + + /** + * Set a value as the first element of the list. + * @param value the value to set + */ + public void setFirst(T value) { + head[0] = value; + } + + /** + * Loops through all elements of the list. + * @param consumer the consumer of elements + */ + @SuppressWarnings("unchecked") + public void forEach(Consumer consumer) { + Object[] a = head; + final int c = capacity; + while (a != null) { + for (int i = 0; i < c; i++) { + Object o = a[i]; + if (o == null) { + return; + } + consumer.accept((T)o); + } + a = (Object[])a[c]; + } + } + + /** + * Loops over all elements of the array until a null element is encountered or + * the given predicate returns true. + * @param consumer the consumer of values that returns true if the forEach should terminate + */ + @SuppressWarnings("unchecked") + public void forEachWhile(Predicate consumer) { + Object[] a = head; + final int c = capacity; + while (a != null) { + for (int i = 0; i < c; i++) { + Object o = a[i]; + if (o == null) { + return; + } + if (consumer.test((T)o)) { + return; + } + } + a = (Object[])a[c]; + } + } + + @SuppressWarnings("unchecked") + public void forEachWhile(S state, BiPredicate consumer) { + Object[] a = head; + final int c = capacity; + while (a != null) { + for (int i = 0; i < c; i++) { + Object o = a[i]; + if (o == null) { + return; + } + if (consumer.test(state, (T)o)) { + return; + } + } + a = (Object[])a[c]; + } + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java b/src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java new file mode 100644 index 000000000..214aa00b2 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java @@ -0,0 +1,131 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReferenceArray; + +abstract class BaseArrayQueue extends AtomicReferenceArray implements Queue { + /** */ + private static final long serialVersionUID = 5238363267841964068L; + protected final int mask; + public BaseArrayQueue(int capacity) { + super(Pow2.roundToPowerOfTwo(capacity)); + this.mask = length() - 1; + } + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + @Override + public void clear() { + // we have to test isEmpty because of the weaker poll() guarantee + while (poll() != null || !isEmpty()) + ; + } + protected final int calcElementOffset(long index, int mask) { + return (int)index & mask; + } + protected final int calcElementOffset(long index) { + return (int)index & mask; + } + protected final E lvElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); + } + protected final E lpElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); // no weaker form available + } + protected final E lpElement(int offset) { + return get(offset); // no weaker form available + } + protected final void spElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.lazySet(offset, value); // no weaker form available + } + protected final void spElement(int offset, E value) { + lazySet(offset, value); // no weaker form available + } + protected final void soElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.lazySet(offset, value); + } + protected final void soElement(int offset, E value) { + lazySet(offset, value); + } + protected final void svElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.set(offset, value); + } + protected final E lvElement(int offset) { + return get(offset); + } + + @Override + public boolean add(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public E remove() { + throw new UnsupportedOperationException(); + } + + @Override + public E element() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } +} + diff --git a/src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java b/src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java new file mode 100644 index 000000000..bc1047ae2 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java @@ -0,0 +1,94 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +abstract class BaseLinkedQueue extends AbstractQueue { + private final AtomicReference> producerNode; + private final AtomicReference> consumerNode; + public BaseLinkedQueue() { + producerNode = new AtomicReference<>(); + consumerNode = new AtomicReference<>(); + } + protected final LinkedQueueNode lvProducerNode() { + return producerNode.get(); + } + protected final LinkedQueueNode lpProducerNode() { + return producerNode.get(); + } + protected final void spProducerNode(LinkedQueueNode node) { + producerNode.lazySet(node); + } + protected final LinkedQueueNode xchgProducerNode(LinkedQueueNode node) { + return producerNode.getAndSet(node); + } + protected final LinkedQueueNode lvConsumerNode() { + return consumerNode.get(); + } + + protected final LinkedQueueNode lpConsumerNode() { + return consumerNode.get(); + } + protected final void spConsumerNode(LinkedQueueNode node) { + consumerNode.lazySet(node); + } + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * This is an O(n) operation as we run through all the nodes and count them.
+ * + * @see java.util.Queue#size() + */ + @Override + public final int size() { + LinkedQueueNode chaserNode = lvConsumerNode(); + final LinkedQueueNode producerNode = lvProducerNode(); + int size = 0; + // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. + while (chaserNode != producerNode && size < Integer.MAX_VALUE) { + LinkedQueueNode next; + while((next = chaserNode.lvNext()) == null); + chaserNode = next; + size++; + } + return size; + } + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe + * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to + * be null. + * + * @see MessagePassingQueue#isEmpty() + */ + @Override + public final boolean isEmpty() { + return lvConsumerNode() == lvProducerNode(); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java b/src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java new file mode 100644 index 000000000..bc03d6f0c --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java @@ -0,0 +1,58 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.concurrent.atomic.AtomicReference; + +public final class LinkedQueueNode extends AtomicReference> { + /** */ + private static final long serialVersionUID = 2404266111789071508L; + private E value; + LinkedQueueNode() { + } + LinkedQueueNode(E val) { + spValue(val); + } + /** + * Gets the current value and nulls out the reference to it from this node. + * + * @return value + */ + public E getAndNullValue() { + E temp = lpValue(); + spValue(null); + return temp; + } + + public E lpValue() { + return value; + } + + public void spValue(E newValue) { + value = newValue; + } + + public void soNext(LinkedQueueNode n) { + lazySet(n); + } + + public LinkedQueueNode lvNext() { + return get(); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java b/src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java new file mode 100644 index 000000000..45741ac38 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java @@ -0,0 +1,112 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +/** + * A multi-producer single consumer unbounded queue. + */ +public final class MpscLinkedQueue extends BaseLinkedQueue { + + public MpscLinkedQueue() { + super(); + LinkedQueueNode node = new LinkedQueueNode<>(); + spConsumerNode(node); + xchgProducerNode(node);// this ensures correct construction: StoreLoad + } + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Offer is allowed from multiple threads.
+ * Offer allocates a new node and: + *

    + *
  1. Swaps it atomically with current producer node (only one producer 'wins') + *
  2. Sets the new node as the node following from the swapped producer node + *
+ * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can + * get the same producer node as part of XCHG guarantee. + * + * @see MessagePassingQueue#offer(Object) + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public final boolean offer(final T nextValue) { + final LinkedQueueNode nextNode = new LinkedQueueNode<>(nextValue); + final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); + // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed + // and completes the store in prev.next. + prevProducerNode.soNext(nextNode); // StoreStore + return true; + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll reads the next node from the consumerNode and: + *

    + *
  1. If it is null, the queue is assumed empty (though it might not be). + *
  2. If it is not null set it as the consumer node and return it's now evacuated value. + *
+ * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null + * values are not allowed to be offered this is the only node with it's value set to null at any one time. + * + * @see MessagePassingQueue#poll() + * @see java.util.Queue#poll() + */ + @Override + public final T poll() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final T nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + else if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + + // we have to null out the value because we are going to hang on to the node + final T nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + return null; + } + + @Override + public final T peek() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + return nextNode.lpValue(); + } else + if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while ((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + return nextNode.lpValue(); + } + return null; + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/NotificationLite.java b/src/main/java/io/reactivesocket/internal/rx/NotificationLite.java new file mode 100644 index 000000000..2091ed836 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/NotificationLite.java @@ -0,0 +1,207 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import java.io.Serializable; + +import org.reactivestreams.*; + +/** + * Lightweight notification handling utility class. + */ +public enum NotificationLite { + // No instances + ; + + /** + * Indicates a completion notification. + */ + private enum Complete { + INSTANCE; + @Override + public String toString() { + return "NotificationLite.Complete"; + }; + } + + /** + * Wraps a Throwable. + */ + private static final class ErrorNotification implements Serializable { + /** */ + private static final long serialVersionUID = -8759979445933046293L; + final Throwable e; + ErrorNotification(Throwable e) { + this.e = e; + } + + @Override + public String toString() { + return "NotificationLite.Error[" + e + "]"; + } + } + + /** + * Wraps a Subscription. + */ + private static final class SubscriptionNotification implements Serializable { + /** */ + private static final long serialVersionUID = -1322257508628817540L; + final Subscription s; + SubscriptionNotification(Subscription s) { + this.s = s; + } + + @Override + public String toString() { + return "NotificationLite.Subscription[" + s + "]"; + } + } + + /** + * Converts a value into a notification value. + * @param value the value to convert + * @return the notification representing the value + */ + public static Object next(T value) { + return value; + } + + /** + * Returns a complete notification. + * @return a complete notification + */ + public static Object complete() { + return Complete.INSTANCE; + } + + /** + * Converts a Throwable into a notification value. + * @param e the Throwable to convert + * @return the notification representing the Throwable + */ + public static Object error(Throwable e) { + return new ErrorNotification(e); + } + + /** + * Converts a Subscription into a notification value. + * @param e the Subscription to convert + * @return the notification representing the Subscription + */ + public static Object subscription(Subscription s) { + return new SubscriptionNotification(s); + } + + /** + * Checks if the given object represents a complete notification. + * @param o the object to check + * @return true if the object represents a complete notification + */ + public static boolean isComplete(Object o) { + return o == Complete.INSTANCE; + } + + /** + * Checks if the given object represents a error notification. + * @param o the object to check + * @return true if the object represents a error notification + */ + public static boolean isError(Object o) { + return o instanceof ErrorNotification; + } + + /** + * Checks if the given object represents a subscription notification. + * @param o the object to check + * @return true if the object represents a subscription notification + */ + public static boolean isSubscription(Object o) { + return o instanceof SubscriptionNotification; + } + + /** + * Extracts the value from the notification object + * @param o the notification object + * @return the extracted value + */ + @SuppressWarnings("unchecked") + public static T getValue(Object o) { + return (T)o; + } + + /** + * Extracts the Throwable from the notification object + * @param o the notification object + * @return the extracted Throwable + */ + public static Throwable getError(Object o) { + return ((ErrorNotification)o).e; + } + + /** + * Extracts the Subscription from the notification object + * @param o the notification object + * @return the extracted Subscription + */ + public static Subscription getSubscription(Object o) { + return ((SubscriptionNotification)o).s; + } + + /** + * Calls the appropriate Subscriber method based on the type of the notification. + *

Does not check for a subscription notification, see {@link #acceptFull(Object, Subscriber)}. + * @param o the notification object + * @param s the subscriber to call methods on + * @return true if the notification was a terminal event (i.e., complete or error) + * @see #acceptFull(Object, Subscriber) + */ + @SuppressWarnings("unchecked") + public static boolean accept(Object o, Subscriber s) { + if (o == Complete.INSTANCE) { + s.onComplete(); + return true; + } else + if (o instanceof ErrorNotification) { + s.onError(((ErrorNotification)o).e); + return true; + } + s.onNext((T)o); + return false; + } + + /** + * Calls the appropriate Subscriber method based on the type of the notification. + * @param o the notification object + * @param s the subscriber to call methods on + * @return true if the notification was a terminal event (i.e., complete or error) + * @see #accept(Object, Subscriber) + */ + @SuppressWarnings("unchecked") + public static boolean acceptFull(Object o, Subscriber s) { + if (o == Complete.INSTANCE) { + s.onComplete(); + return true; + } else + if (o instanceof ErrorNotification) { + s.onError(((ErrorNotification)o).e); + return true; + } else + if (o instanceof SubscriptionNotification) { + s.onSubscribe(((SubscriptionNotification)o).s); + return false; + } + s.onNext((T)o); + return false; + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java b/src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java new file mode 100644 index 000000000..641d7cf2b --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java @@ -0,0 +1,202 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.reactivestreams.*; + +public final class OperatorConcatMap { + final Function> mapper; + final int bufferSize; + public OperatorConcatMap(Function> mapper, int bufferSize) { + this.mapper = mapper; + this.bufferSize = bufferSize; + } + + public Subscriber apply(Subscriber s) { + SerializedSubscriber ssub = new SerializedSubscriber<>(s); + SubscriptionArbiter sa = new SubscriptionArbiter(); + ssub.onSubscribe(sa); + return new SourceSubscriber<>(ssub, sa, mapper, bufferSize); + } + + static final class SourceSubscriber extends AtomicInteger implements Subscriber { + /** */ + private static final long serialVersionUID = 8828587559905699186L; + final Subscriber actual; + final SubscriptionArbiter sa; + final Function> mapper; + final Subscriber inner; + final Queue queue; + final int bufferSize; + + Subscription s; + + volatile boolean done; + + volatile long index; + + public SourceSubscriber(Subscriber actual, SubscriptionArbiter sa, + Function> mapper, int bufferSize) { + this.actual = actual; + this.sa = sa; + this.mapper = mapper; + this.bufferSize = bufferSize; + this.inner = new InnerSubscriber<>(actual, sa, this); + Queue q; + if (Pow2.isPowerOfTwo(bufferSize)) { + q = new SpscArrayQueue<>(bufferSize); + } else { + q = new SpscExactArrayQueue<>(bufferSize); + } + this.queue = q; + } + @Override + public void onSubscribe(Subscription s) { + if (this.s != null) { + s.cancel(); + return; + } + this.s = s; + s.request(bufferSize); + } + @Override + public void onNext(T t) { + if (done) { + return; + } + if (!queue.offer(t)) { + cancel(); + actual.onError(new IllegalStateException("More values received than requested!")); + return; + } + if (getAndIncrement() == 0) { + drain(); + } + } + @Override + public void onError(Throwable t) { + if (done) { + return; + } + done = true; + cancel(); + actual.onError(t); + } + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + if (getAndIncrement() == 0) { + drain(); + } + } + + void innerComplete() { + if (decrementAndGet() != 0) { + drain(); + } + if (!done) { + s.request(1); + } + } + + void cancel() { + sa.cancel(); + s.cancel(); + } + + void drain() { + boolean d = done; + T o = queue.poll(); + + if (o == null) { + if (d) { + actual.onComplete(); + return; + } + return; + } + Publisher p; + try { + p = mapper.apply(o); + } catch (Throwable e) { + cancel(); + actual.onError(e); + return; + } + index++; + // this is not RS but since our Subscriber doesn't hold state by itself, + // subscribing it to each source is safe and saves allocation + p.subscribe(inner); + } + } + + static final class InnerSubscriber implements Subscriber { + final Subscriber actual; + final SubscriptionArbiter sa; + final SourceSubscriber parent; + + /* + * FIXME this is a workaround for now, but doesn't work + * for async non-conforming sources. + * Such sources require individual instances of InnerSubscriber and a + * done field. + */ + + long index; + + public InnerSubscriber(Subscriber actual, + SubscriptionArbiter sa, SourceSubscriber parent) { + this.actual = actual; + this.sa = sa; + this.parent = parent; + this.index = 1; + } + + @Override + public void onSubscribe(Subscription s) { + if (index == parent.index) { + sa.setSubscription(s); + } + } + + @Override + public void onNext(U t) { + if (index == parent.index) { + actual.onNext(t); + sa.produced(1L); + } + } + @Override + public void onError(Throwable t) { + if (index == parent.index) { + index++; + parent.cancel(); + actual.onError(t); + } + } + @Override + public void onComplete() { + if (index == parent.index) { + index++; + parent.innerComplete(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/Pow2.java b/src/main/java/io/reactivesocket/internal/rx/Pow2.java new file mode 100644 index 000000000..332144a26 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/Pow2.java @@ -0,0 +1,46 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + + +/* + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/util/Pow2.java + */ +package io.reactivesocket.internal.rx; + +public final class Pow2 { + private Pow2() { + throw new IllegalStateException("No instances!"); + } + + /** + * Find the next larger positive power of two value up from the given value. If value is a power of two then + * this value will be returned. + * + * @param value from which next positive power of two will be found. + * @return the next positive power of 2 or this value if it is a power of 2. + */ + public static int roundToPowerOfTwo(final int value) { + return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + } + + /** + * Is this value a power of two. + * + * @param value to be tested to see if it is a power of two. + * @return true if the value is a power of 2 otherwise false. + */ + public static boolean isPowerOfTwo(final int value) { + return (value & (value - 1)) == 0; + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java b/src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java new file mode 100644 index 000000000..fbafaff75 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java @@ -0,0 +1,280 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import java.util.concurrent.atomic.*; +import java.util.function.BooleanSupplier; + +/** + * Utility class to help with the queue-drain serialization idiom. + */ +public enum QueueDrainHelper { + ; + + /** + * A fast-path queue-drain serialization logic. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath called if the instance is uncontended. + * @param queue called if the instance is contended to queue up work + * @param drain called if the instance transitions to the drain state successfully + */ + public static void queueDrain(AtomicIntegerFieldUpdater updater, T instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + fastPath.run(); + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + queue.run(); + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic with the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainIf(AtomicIntegerFieldUpdater updater, T instance, + BooleanSupplier fastPath, BooleanSupplier queue, Runnable drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic where the drain is looped until + * the instance state reaches 0 again. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoop(AtomicIntegerFieldUpdater updater, T instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + fastPath.run(); + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + queue.run(); + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + int missed = 1; + for (;;) { + drain.run(); + + missed = updater.addAndGet(instance, -missed); + if (missed == 0) { + return; + } + } + } + + /** + * A fast-path queue-drain serialization logic with looped drain call and the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoopIf(AtomicIntegerFieldUpdater updater, T instance, + BooleanSupplier fastPath, BooleanSupplier queue, BooleanSupplier drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + int missed = 1; + for (;;) { + + if (drain.getAsBoolean()) { + return; + } + + missed = updater.addAndGet(instance, -missed); + if (missed == 0) { + return; + } + } + } + + /** + * A fast-path queue-drain serialization logic. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath called if the instance is uncontended. + * @param queue called if the instance is contended to queue up work + * @param drain called if the instance transitions to the drain state successfully + */ + public static void queueDrain(AtomicInteger instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + fastPath.run(); + if (instance.decrementAndGet() == 0) { + return; + } + } else { + queue.run(); + if (instance.getAndIncrement() != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic with the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainIf(AtomicInteger instance, + BooleanSupplier fastPath, BooleanSupplier queue, Runnable drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (instance.decrementAndGet() == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (instance.getAndIncrement() != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic where the drain is looped until + * the instance state reaches 0 again. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoop(AtomicInteger instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + fastPath.run(); + if (instance.decrementAndGet() == 0) { + return; + } + } else { + queue.run(); + if (instance.getAndIncrement() != 0) { + return; + } + } + int missed = 1; + for (;;) { + drain.run(); + + missed = instance.addAndGet(-missed); + if (missed == 0) { + return; + } + } + } + + /** + * A fast-path queue-drain serialization logic with looped drain call and the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoopIf(AtomicInteger instance, + BooleanSupplier fastPath, BooleanSupplier queue, BooleanSupplier drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (instance.decrementAndGet() == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (instance.getAndIncrement() != 0) { + return; + } + } + int missed = 1; + for (;;) { + + if (drain.getAsBoolean()) { + return; + } + + missed = instance.addAndGet(-missed); + if (missed == 0) { + return; + } + } + } + +} diff --git a/src/main/java/io/reactivesocket/internal/rx/README.md b/src/main/java/io/reactivesocket/internal/rx/README.md new file mode 100644 index 000000000..dc1b56023 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/README.md @@ -0,0 +1,3 @@ +RxJava v2 code copy/pasted to here since RxJava v2 is not yet ready to be depended upon (still in design flux, rapid code changes, not even a developer preview on Maven Central yet). + +Someday this package should theoretically go away and RxJava v2 directly used. \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java b/src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java new file mode 100644 index 000000000..fab2efcca --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java @@ -0,0 +1,176 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import org.reactivestreams.*; + + +/** + * Serializes access to the onNext, onError and onComplete methods of another Subscriber. + * + *

Note that onSubscribe is not serialized in respect of the other methods so + * make sure the Subscription is set before any of the other methods are called. + * + *

The implementation assumes that the actual Subscriber's methods don't throw. + * + * @param the value type + */ +public final class SerializedSubscriber implements Subscriber { + final Subscriber actual; + final boolean delayError; + + static final int QUEUE_LINK_SIZE = 4; + + Subscription subscription; + + boolean emitting; + AppendOnlyLinkedArrayList queue; + + volatile boolean done; + + public SerializedSubscriber(Subscriber actual) { + this(actual, false); + } + + public SerializedSubscriber(Subscriber actual, boolean delayError) { + this.actual = actual; + this.delayError = delayError; + } + @Override + public void onSubscribe(Subscription s) { + if (subscription != null) { + s.cancel(); + onError(new IllegalStateException("Subscription already set!")); + return; + } + this.subscription = s; + + actual.onSubscribe(s); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (t == null) { + subscription.cancel(); + onError(new NullPointerException()); + return; + } + synchronized (this) { + if (done) { + return; + } + if (emitting) { + AppendOnlyLinkedArrayList q = queue; + if (q == null) { + q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE); + queue = q; + } + q.add(NotificationLite.next(t)); + return; + } + emitting = true; + } + + actual.onNext(t); + + emitLoop(); + } + + @Override + public void onError(Throwable t) { + if (done) { + return; + } + boolean reportError; + synchronized (this) { + if (done) { + reportError = true; + } else + if (emitting) { + done = true; + AppendOnlyLinkedArrayList q = queue; + if (q == null) { + q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE); + queue = q; + } + Object err = NotificationLite.error(t); + if (delayError) { + q.add(err); + } else { + q.setFirst(err); + } + return; + } else { + done = true; + emitting = true; + reportError = false; + } + } + + if (reportError) { + return; + } + + actual.onError(t); + // no need to loop because this onError is the last event + } + + @Override + public void onComplete() { + if (done) { + return; + } + synchronized (this) { + if (done) { + return; + } + if (emitting) { + AppendOnlyLinkedArrayList q = queue; + if (q == null) { + q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE); + queue = q; + } + q.add(NotificationLite.complete()); + return; + } + done = true; + emitting = true; + } + + actual.onComplete(); + // no need to loop because this onComplete is the last event + } + + void emitLoop() { + for (;;) { + AppendOnlyLinkedArrayList q; + synchronized (this) { + q = queue; + if (q == null) { + emitting = false; + return; + } + queue = null; + } + + q.forEachWhile(this::accept); + } + } + + boolean accept(Object value) { + return NotificationLite.accept(value, actual); + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java b/src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java new file mode 100644 index 000000000..348551e68 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java @@ -0,0 +1,133 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. + *

+ * This implementation is a mashup of the Fast Flow + * algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast + * Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
+ * For convenience the relevant papers are available in the resources folder:
+ * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
+ * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
+ *
This implementation is wait free. + * + * @param + */ +public final class SpscArrayQueue extends BaseArrayQueue { + /** */ + private static final long serialVersionUID = -1296597691183856449L; + private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); + final AtomicLong producerIndex; + protected long producerLookAhead; + final AtomicLong consumerIndex; + final int lookAheadStep; + public SpscArrayQueue(int capacity) { + super(capacity); + this.producerIndex = new AtomicLong(); + this.consumerIndex = new AtomicLong(); + lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); + } + + @Override + public boolean offer(E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + // local load of field to avoid repeated loads after volatile reads + final int mask = this.mask; + final long index = producerIndex.get(); + final int offset = calcElementOffset(index, mask); + if (index >= producerLookAhead) { + int step = lookAheadStep; + if (null == lvElement(calcElementOffset(index + step, mask))) {// LoadLoad + producerLookAhead = index + step; + } + else if (null != lvElement(offset)){ + return false; + } + } + soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(offset, e); // StoreStore + return true; + } + + @Override + public E poll() { + final long index = consumerIndex.get(); + final int offset = calcElementOffset(index); + // local load of field to avoid repeated loads after volatile reads + final E e = lvElement(offset);// LoadLoad + if (null == e) { + return null; + } + soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(offset, null);// StoreStore + return e; + } + + @Override + public E peek() { + return lvElement(calcElementOffset(consumerIndex.get())); + } + + @Override + public boolean isEmpty() { + return producerIndex.get() == consumerIndex.get(); + } + + @Override + public int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer + * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent + * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + private void soProducerIndex(long newIndex) { + producerIndex.lazySet(newIndex); + } + + private void soConsumerIndex(long newIndex) { + consumerIndex.lazySet(newIndex); + } + + private long lvConsumerIndex() { + return consumerIndex.get(); + } + private long lvProducerIndex() { + return producerIndex.get(); + } + +} + diff --git a/src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java b/src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java new file mode 100644 index 000000000..41b3664b3 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java @@ -0,0 +1,164 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * A single-producer single-consumer array-backed queue with exact, non power-of-2 logical capacity. + */ +public final class SpscExactArrayQueue extends AtomicReferenceArray implements Queue { + /** */ + private static final long serialVersionUID = 6210984603741293445L; + final int mask; + final int capacitySkip; + volatile long producerIndex; + volatile long consumerIndex; + + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater PRODUCER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscExactArrayQueue.class, "producerIndex"); + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater CONSUMER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscExactArrayQueue.class, "consumerIndex"); + + public SpscExactArrayQueue(int capacity) { + super(Pow2.roundToPowerOfTwo(capacity)); + int len = length(); + this.mask = len - 1; + this.capacitySkip = len - capacity; + } + + + @Override + public boolean offer(T value) { + Objects.requireNonNull(value); + + long pi = producerIndex; + int m = mask; + + int fullCheck = (int)(pi + capacitySkip) & m; + if (get(fullCheck) != null) { + return false; + } + int offset = (int)pi & m; + PRODUCER_INDEX.lazySet(this, pi + 1); + lazySet(offset, value); + return true; + } + @Override + public T poll() { + long ci = consumerIndex; + int offset = (int)ci & mask; + T value = get(offset); + if (value == null) { + return null; + } + CONSUMER_INDEX.lazySet(this, ci + 1); + lazySet(offset, null); + return value; + } + @Override + public T peek() { + return get((int)consumerIndex & mask); + } + @Override + public void clear() { + while (poll() != null || !isEmpty()); + } + @Override + public boolean isEmpty() { + return producerIndex == consumerIndex; + } + + @Override + public int size() { + long ci = consumerIndex; + for (;;) { + long pi = producerIndex; + long ci2 = consumerIndex; + if (ci == ci2) { + return (int)(pi - ci2); + } + ci = ci2; + } + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public E[] toArray(E[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T e) { + throw new UnsupportedOperationException(); + } + + @Override + public T remove() { + throw new UnsupportedOperationException(); + } + + @Override + public T element() { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java b/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java new file mode 100644 index 000000000..8e729099a --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java @@ -0,0 +1,191 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal.rx; +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +import java.util.*; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscription; + +import io.reactivesocket.internal.BackpressureHelper; +import io.reactivesocket.internal.SubscriptionHelper; + +/** + * Arbitrates requests and cancellation between Subscriptions. + */ +public final class SubscriptionArbiter extends AtomicInteger implements Subscription { + /** */ + private static final long serialVersionUID = -2189523197179400958L; + + final Queue missedSubscription = new MpscLinkedQueue<>(); + + Subscription actual; + long requested; + + volatile boolean cancelled; + + volatile long missedRequested; + static final AtomicLongFieldUpdater MISSED_REQUESTED = + AtomicLongFieldUpdater.newUpdater(SubscriptionArbiter.class, "missedRequested"); + + volatile long missedProduced; + static final AtomicLongFieldUpdater MISSED_PRODUCED = + AtomicLongFieldUpdater.newUpdater(SubscriptionArbiter.class, "missedProduced"); + + private long addRequested(long n) { + long r = requested; + long u = BackpressureHelper.addCap(r, n); + requested = u; + return r; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validateRequest(n)) { + return; + } + if (cancelled) { + return; + } + QueueDrainHelper.queueDrainLoop(this, () -> { + addRequested(n); + Subscription s = actual; + if (s != null) { + s.request(n); + } + }, () -> { + BackpressureHelper.add(MISSED_REQUESTED, this, n); + }, this::drain); + } + + public void produced(long n) { + if (n <= 0) { + return; + } + QueueDrainHelper.queueDrainLoop(this, () -> { + long r = requested; + if (r == Long.MAX_VALUE) { + return; + } + long u = r - n; + if (u < 0L) { + u = 0; + } + requested = u; + }, () -> { + BackpressureHelper.add(MISSED_PRODUCED, this, n); + }, this::drain); + } + + public void setSubscription(Subscription s) { + Objects.requireNonNull(s); + if (cancelled) { + s.cancel(); + return; + } + QueueDrainHelper.queueDrainLoop(this, () -> { + Subscription a = actual; + if (a != null) { + a.cancel(); + } + actual = s; + long r = requested; + if (r != 0L) { + s.request(r); + } + }, () -> { + missedSubscription.offer(s); + }, this::drain); + } + + @Override + public void cancel() { + if (cancelled) { + return; + } + cancelled = true; + QueueDrainHelper.queueDrainLoop(this, () -> { + Subscription a = actual; + if (a != null) { + actual = null; + a.cancel(); + } + }, () -> { + // nothing to queue + }, this::drain); + } + + public boolean isCancelled() { + return cancelled; + } + + void drain() { + long mr = MISSED_REQUESTED.getAndSet(this, 0L); + long mp = MISSED_PRODUCED.getAndSet(this, 0L); + Subscription ms = missedSubscription.poll(); + boolean c = cancelled; + + long r = requested; + if (r != Long.MAX_VALUE && !c) { + long u = r + mr; + if (u < 0L) { + r = Long.MAX_VALUE; + requested = Long.MAX_VALUE; + } else { + long v = u - mp; + if (v < 0L) { + v = 0L; + } + r = v; + requested = v; + } + } + + Subscription a = actual; + if (c && a != null) { + actual = null; + a.cancel(); + } + + if (ms == null) { + if (a != null && mr != 0L) { + a.request(mr); + } + } else { + if (c) { + ms.cancel(); + } else { + if (a != null) { + a.cancel(); + } + actual = ms; + if (r != 0L) { + ms.request(r); + } + } + } + } +} From 6e26b76c20d0150361f4943d01acd989cc0d5e5f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Sep 2015 20:14:03 -0700 Subject: [PATCH 2/5] Move Other Rx Classes to .internal.rx --- .../io/reactivesocket/ReactiveSocket.java | 4 +- .../internal/PublisherUtils.java | 4 + .../io/reactivesocket/internal/Requester.java | 3 + .../io/reactivesocket/internal/Responder.java | 2 + .../internal/{ => rx}/BackpressureHelper.java | 2 +- .../internal/{ => rx}/BackpressureUtils.java | 2 +- .../internal/{ => rx}/BooleanDisposable.java | 2 +- .../{ => rx}/CompositeCompletable.java | 2 +- .../{ => rx}/CompositeDisposable.java | 2 +- .../internal/{ => rx}/EmptyDisposable.java | 2 +- .../internal/{ => rx}/EmptySubscription.java | 2 +- .../internal/rx/SubscriptionArbiter.java | 3 - .../internal/rx/SubscriptionHelper.java | 79 +++++++++++++++++++ 13 files changed, 97 insertions(+), 12 deletions(-) rename src/main/java/io/reactivesocket/internal/{ => rx}/BackpressureHelper.java (98%) rename src/main/java/io/reactivesocket/internal/{ => rx}/BackpressureUtils.java (98%) rename src/main/java/io/reactivesocket/internal/{ => rx}/BooleanDisposable.java (95%) rename src/main/java/io/reactivesocket/internal/{ => rx}/CompositeCompletable.java (97%) rename src/main/java/io/reactivesocket/internal/{ => rx}/CompositeDisposable.java (96%) rename src/main/java/io/reactivesocket/internal/{ => rx}/EmptyDisposable.java (95%) rename src/main/java/io/reactivesocket/internal/{ => rx}/EmptySubscription.java (98%) create mode 100644 src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index 89e52a5f7..7cd372a5c 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -26,10 +26,10 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.internal.CompositeCompletable; -import io.reactivesocket.internal.CompositeDisposable; import io.reactivesocket.internal.Requester; import io.reactivesocket.internal.Responder; +import io.reactivesocket.internal.rx.CompositeCompletable; +import io.reactivesocket.internal.rx.CompositeDisposable; import io.reactivesocket.observable.Disposable; import io.reactivesocket.observable.Observable; import io.reactivesocket.observable.Observer; diff --git a/src/main/java/io/reactivesocket/internal/PublisherUtils.java b/src/main/java/io/reactivesocket/internal/PublisherUtils.java index cf9b92e4d..78c47343a 100644 --- a/src/main/java/io/reactivesocket/internal/PublisherUtils.java +++ b/src/main/java/io/reactivesocket/internal/PublisherUtils.java @@ -27,6 +27,10 @@ import io.reactivesocket.Frame; import io.reactivesocket.Payload; +import io.reactivesocket.internal.rx.BackpressureHelper; +import io.reactivesocket.internal.rx.BackpressureUtils; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.internal.rx.SubscriptionHelper; public class PublisherUtils { diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index aecdaf030..21cd0304c 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -38,6 +38,9 @@ import io.reactivesocket.exceptions.CancelException; import io.reactivesocket.exceptions.Exceptions; import io.reactivesocket.exceptions.Retryable; +import io.reactivesocket.internal.rx.BackpressureUtils; +import io.reactivesocket.internal.rx.EmptyDisposable; +import io.reactivesocket.internal.rx.EmptySubscription; import io.reactivesocket.observable.Disposable; import io.reactivesocket.observable.Observer; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index 186c43f51..0fbc9cbf2 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -28,6 +28,8 @@ import org.reactivestreams.Subscription; import io.reactivesocket.exceptions.SetupException; +import io.reactivesocket.internal.rx.EmptyDisposable; +import io.reactivesocket.internal.rx.EmptySubscription; import io.reactivesocket.observable.Disposable; import io.reactivesocket.observable.Observer; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; diff --git a/src/main/java/io/reactivesocket/internal/BackpressureHelper.java b/src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/BackpressureHelper.java rename to src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java index bee59b440..cd51ac01b 100644 --- a/src/main/java/io/reactivesocket/internal/BackpressureHelper.java +++ b/src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java @@ -10,7 +10,7 @@ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.concurrent.atomic.*; diff --git a/src/main/java/io/reactivesocket/internal/BackpressureUtils.java b/src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/BackpressureUtils.java rename to src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java index b8ce796f7..d67598d1f 100644 --- a/src/main/java/io/reactivesocket/internal/BackpressureUtils.java +++ b/src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java @@ -1,4 +1,4 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; /** * Copyright 2015 Netflix, Inc. diff --git a/src/main/java/io/reactivesocket/internal/BooleanDisposable.java b/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java similarity index 95% rename from src/main/java/io/reactivesocket/internal/BooleanDisposable.java rename to src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java index 17f75124f..080dbceef 100644 --- a/src/main/java/io/reactivesocket/internal/BooleanDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; diff --git a/src/main/java/io/reactivesocket/internal/CompositeCompletable.java b/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/CompositeCompletable.java rename to src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java index 145853732..131d0c719 100644 --- a/src/main/java/io/reactivesocket/internal/CompositeCompletable.java +++ b/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.HashSet; import java.util.Set; diff --git a/src/main/java/io/reactivesocket/internal/CompositeDisposable.java b/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java similarity index 96% rename from src/main/java/io/reactivesocket/internal/CompositeDisposable.java rename to src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java index 506007d3f..6b74b8994 100644 --- a/src/main/java/io/reactivesocket/internal/CompositeDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.HashSet; import java.util.Set; diff --git a/src/main/java/io/reactivesocket/internal/EmptyDisposable.java b/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java similarity index 95% rename from src/main/java/io/reactivesocket/internal/EmptyDisposable.java rename to src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java index e878d81a6..1e6cd4723 100644 --- a/src/main/java/io/reactivesocket/internal/EmptyDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import io.reactivesocket.observable.Disposable; diff --git a/src/main/java/io/reactivesocket/internal/EmptySubscription.java b/src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/EmptySubscription.java rename to src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java index cb2c58581..fe2c38685 100644 --- a/src/main/java/io/reactivesocket/internal/EmptySubscription.java +++ b/src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java @@ -11,7 +11,7 @@ * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import org.reactivestreams.*; diff --git a/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java b/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java index 8e729099a..233ab3472 100644 --- a/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java +++ b/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java @@ -30,9 +30,6 @@ import org.reactivestreams.Subscription; -import io.reactivesocket.internal.BackpressureHelper; -import io.reactivesocket.internal.SubscriptionHelper; - /** * Arbitrates requests and cancellation between Subscriptions. */ diff --git a/src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java b/src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java new file mode 100644 index 000000000..ad72a8d57 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java @@ -0,0 +1,79 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal.rx; + +import org.reactivestreams.*; + +public enum SubscriptionHelper { + ; + + public static boolean validateSubscription(Subscription current, Subscription next) { + if (next == null) { + return true; + } + if (current != null) { + next.cancel(); + return true; + } + return false; + } + + /** + *

+ * Make sure error reporting via s.onError is serialized. + * + * @param current + * @param next + * @param s + * @return + */ + public static boolean validateSubscription(Subscription current, Subscription next, Subscriber s) { + if (next == null) { + s.onError(new NullPointerException("next is null")); + return true; + } + if (current != null) { + next.cancel(); + return true; + } + return false; + } + + public static boolean validateRequest(long n) { + if (n <= 0) { + return true; + } + return false; + } + + /** + *

+ * Make sure error reporting via s.onError is serialized. + * + * @param n + * @param current + * @param s + * @return + */ + public static boolean validateRequest(long n, Subscription current, Subscriber s) { + if (n <= 0) { + if (current != null) { + current.cancel(); + } + s.onError(new IllegalArgumentException("n > 0 required but it was " + n)); + return true; + } + return false; + } +} From 32959728d247bb901407bfdad05b6474198853c8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Sep 2015 20:17:25 -0700 Subject: [PATCH 3/5] Frame Related Classes in Separate Package --- .../ConnectionSetupPayload.java | 4 +- src/main/java/io/reactivesocket/Frame.java | 8 ++ .../reactivesocket/exceptions/Exceptions.java | 2 +- .../internal/FragmentedPublisher.java | 57 +++++++++++++ .../io/reactivesocket/internal/Requester.java | 1 + .../io/reactivesocket/internal/Responder.java | 9 ++- .../internal/SubscriptionHelper.java | 79 ------------------- .../internal/{ => frame}/ByteBufferUtil.java | 2 +- .../{ => frame}/ErrorFrameFlyweight.java | 2 +- .../{ => frame}/FrameHeaderFlyweight.java | 6 +- .../internal/{ => frame}/FramePool.java | 2 +- .../{ => frame}/KeepaliveFrameFlyweight.java | 2 +- .../{ => frame}/LeaseFrameFlyweight.java | 2 +- .../internal/{ => frame}/PayloadBuilder.java | 2 +- .../{ => frame}/PayloadFragmenter.java | 2 +- .../{ => frame}/PayloadReassembler.java | 3 +- .../{ => frame}/RequestFrameFlyweight.java | 2 +- .../{ => frame}/RequestNFrameFlyweight.java | 2 +- .../{ => frame}/SetupFrameFlyweight.java | 2 +- .../{ => frame}/ThreadLocalFramePool.java | 2 +- .../{ => frame}/ThreadSafeFramePool.java | 2 +- .../internal/{ => frame}/UnpooledFrame.java | 2 +- .../java/io/reactivesocket/FrameTest.java | 5 +- .../internal/FragmenterTest.java | 3 + .../internal/ReassemblerTest.java | 2 + 25 files changed, 100 insertions(+), 105 deletions(-) create mode 100644 src/main/java/io/reactivesocket/internal/FragmentedPublisher.java delete mode 100644 src/main/java/io/reactivesocket/internal/SubscriptionHelper.java rename src/main/java/io/reactivesocket/internal/{ => frame}/ByteBufferUtil.java (97%) rename src/main/java/io/reactivesocket/internal/{ => frame}/ErrorFrameFlyweight.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/FrameHeaderFlyweight.java (99%) rename src/main/java/io/reactivesocket/internal/{ => frame}/FramePool.java (96%) rename src/main/java/io/reactivesocket/internal/{ => frame}/KeepaliveFrameFlyweight.java (97%) rename src/main/java/io/reactivesocket/internal/{ => frame}/LeaseFrameFlyweight.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/PayloadBuilder.java (99%) rename src/main/java/io/reactivesocket/internal/{ => frame}/PayloadFragmenter.java (99%) rename src/main/java/io/reactivesocket/internal/{ => frame}/PayloadReassembler.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/RequestFrameFlyweight.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/RequestNFrameFlyweight.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/SetupFrameFlyweight.java (99%) rename src/main/java/io/reactivesocket/internal/{ => frame}/ThreadLocalFramePool.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/ThreadSafeFramePool.java (98%) rename src/main/java/io/reactivesocket/internal/{ => frame}/UnpooledFrame.java (97%) diff --git a/src/main/java/io/reactivesocket/ConnectionSetupPayload.java b/src/main/java/io/reactivesocket/ConnectionSetupPayload.java index f7f77052d..fa6e4dd07 100644 --- a/src/main/java/io/reactivesocket/ConnectionSetupPayload.java +++ b/src/main/java/io/reactivesocket/ConnectionSetupPayload.java @@ -15,10 +15,10 @@ */ package io.reactivesocket; -import io.reactivesocket.internal.SetupFrameFlyweight; - import java.nio.ByteBuffer; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; + /** * Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data */ diff --git a/src/main/java/io/reactivesocket/Frame.java b/src/main/java/io/reactivesocket/Frame.java index b9da0ac4d..dd29c8e35 100644 --- a/src/main/java/io/reactivesocket/Frame.java +++ b/src/main/java/io/reactivesocket/Frame.java @@ -16,6 +16,14 @@ package io.reactivesocket; import io.reactivesocket.internal.*; +import io.reactivesocket.internal.frame.ErrorFrameFlyweight; +import io.reactivesocket.internal.frame.FrameHeaderFlyweight; +import io.reactivesocket.internal.frame.FramePool; +import io.reactivesocket.internal.frame.LeaseFrameFlyweight; +import io.reactivesocket.internal.frame.RequestFrameFlyweight; +import io.reactivesocket.internal.frame.RequestNFrameFlyweight; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; +import io.reactivesocket.internal.frame.UnpooledFrame; import uk.co.real_logic.agrona.DirectBuffer; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/exceptions/Exceptions.java b/src/main/java/io/reactivesocket/exceptions/Exceptions.java index 93848cd99..7b2581b80 100644 --- a/src/main/java/io/reactivesocket/exceptions/Exceptions.java +++ b/src/main/java/io/reactivesocket/exceptions/Exceptions.java @@ -19,7 +19,7 @@ import java.nio.ByteBuffer; -import static io.reactivesocket.internal.ErrorFrameFlyweight.*; +import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*; import static java.nio.charset.StandardCharsets.UTF_8; public class Exceptions { diff --git a/src/main/java/io/reactivesocket/internal/FragmentedPublisher.java b/src/main/java/io/reactivesocket/internal/FragmentedPublisher.java new file mode 100644 index 000000000..4a4f21c6f --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/FragmentedPublisher.java @@ -0,0 +1,57 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.internal; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.reactivesocket.Frame; +import io.reactivesocket.FrameType; +import io.reactivesocket.Payload; +import io.reactivesocket.internal.frame.PayloadFragmenter; + +public class FragmentedPublisher implements Publisher { + + private final PayloadFragmenter fragmenter = new PayloadFragmenter(Frame.METADATA_MTU, Frame.DATA_MTU); + private final Publisher responsePublisher; + private final int streamId; + private final FrameType type; + + public FragmentedPublisher(FrameType type, int streamId, Publisher responsePublisher) { + this.type = type; + this.streamId = streamId; + this.responsePublisher = responsePublisher; + } + + @Override + public void subscribe(Subscriber child) { + child.onSubscribe(new Subscription() { + + @Override + public void request(long n) { + // TODO Auto-generated method stub + + } + + @Override + public void cancel() { + // TODO Auto-generated method stub + + }}); + } + +} diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index 21cd0304c..6f252a044 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -38,6 +38,7 @@ import io.reactivesocket.exceptions.CancelException; import io.reactivesocket.exceptions.Exceptions; import io.reactivesocket.exceptions.Retryable; +import io.reactivesocket.internal.frame.RequestFrameFlyweight; import io.reactivesocket.internal.rx.BackpressureUtils; import io.reactivesocket.internal.rx.EmptyDisposable; import io.reactivesocket.internal.rx.EmptySubscription; diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index 0fbc9cbf2..4c944ee87 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -28,6 +28,7 @@ import org.reactivestreams.Subscription; import io.reactivesocket.exceptions.SetupException; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; import io.reactivesocket.internal.rx.EmptyDisposable; import io.reactivesocket.internal.rx.EmptySubscription; import io.reactivesocket.observable.Disposable; @@ -243,6 +244,7 @@ public void onNext(Frame requestFrame) { final RejectedException exception = new RejectedException("No associated lease"); responsePublisher = PublisherUtils.errorFrame(streamId, exception); } + connection.addOutput(responsePublisher, new Completable() { @Override @@ -333,7 +335,7 @@ public void request(long n) { if (n > 0 && started.compareAndSet(false, true)) { final int streamId = requestFrame.getStreamId(); - requestHandler.handleRequestResponse(requestFrame).subscribe(new Subscriber() { + new FragmentedPublisher(FrameType.NEXT_COMPLETE, streamId, requestHandler.handleRequestResponse(requestFrame)).subscribe(new Subscriber() { // event emission is serialized so this doesn't need to be atomic int count = 0; @@ -349,12 +351,11 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(Payload v) { + public void onNext(Frame v) { if (++count > 1) { onError(new IllegalStateException("RequestResponse expects a single onNext")); } else { - - child.onNext(Frame.Response.from(streamId, FrameType.NEXT_COMPLETE, v)); + child.onNext(v); } } diff --git a/src/main/java/io/reactivesocket/internal/SubscriptionHelper.java b/src/main/java/io/reactivesocket/internal/SubscriptionHelper.java deleted file mode 100644 index d0f6970ff..000000000 --- a/src/main/java/io/reactivesocket/internal/SubscriptionHelper.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivesocket.internal; - -import org.reactivestreams.*; - -public enum SubscriptionHelper { - ; - - public static boolean validateSubscription(Subscription current, Subscription next) { - if (next == null) { - return true; - } - if (current != null) { - next.cancel(); - return true; - } - return false; - } - - /** - *

- * Make sure error reporting via s.onError is serialized. - * - * @param current - * @param next - * @param s - * @return - */ - public static boolean validateSubscription(Subscription current, Subscription next, Subscriber s) { - if (next == null) { - s.onError(new NullPointerException("next is null")); - return true; - } - if (current != null) { - next.cancel(); - return true; - } - return false; - } - - public static boolean validateRequest(long n) { - if (n <= 0) { - return true; - } - return false; - } - - /** - *

- * Make sure error reporting via s.onError is serialized. - * - * @param n - * @param current - * @param s - * @return - */ - public static boolean validateRequest(long n, Subscription current, Subscriber s) { - if (n <= 0) { - if (current != null) { - current.cancel(); - } - s.onError(new IllegalArgumentException("n > 0 required but it was " + n)); - return true; - } - return false; - } -} diff --git a/src/main/java/io/reactivesocket/internal/ByteBufferUtil.java b/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/ByteBufferUtil.java rename to src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java index 14e21f99b..b6cc0fa0c 100644 --- a/src/main/java/io/reactivesocket/internal/ByteBufferUtil.java +++ b/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import java.nio.ByteBuffer; diff --git a/src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java index 57f34ef25..40dbcc115 100644 --- a/src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import io.reactivesocket.exceptions.*; diff --git a/src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java index cb83b2d9e..be6c61a76 100644 --- a/src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java @@ -13,18 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; import uk.co.real_logic.agrona.DirectBuffer; import uk.co.real_logic.agrona.MutableDirectBuffer; +import static io.reactivesocket.internal.frame.ByteBufferUtil.*; + import java.nio.ByteBuffer; import java.nio.ByteOrder; -import static io.reactivesocket.internal.ByteBufferUtil.preservingSlice; - /** * Per connection frame flyweight. * diff --git a/src/main/java/io/reactivesocket/internal/FramePool.java b/src/main/java/io/reactivesocket/internal/frame/FramePool.java similarity index 96% rename from src/main/java/io/reactivesocket/internal/FramePool.java rename to src/main/java/io/reactivesocket/internal/frame/FramePool.java index 134295c84..4290891cb 100644 --- a/src/main/java/io/reactivesocket/internal/FramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/FramePool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java index 4b68cd0ad..8e03b8904 100644 --- a/src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.DirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java index af3375ba5..8a663fca5 100644 --- a/src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/PayloadBuilder.java b/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/PayloadBuilder.java rename to src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java index 71358f7cb..a068ab96e 100644 --- a/src/main/java/io/reactivesocket/internal/PayloadBuilder.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import io.reactivesocket.Payload; diff --git a/src/main/java/io/reactivesocket/internal/PayloadFragmenter.java b/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/PayloadFragmenter.java rename to src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java index 7fc9d5062..124245726 100644 --- a/src/main/java/io/reactivesocket/internal/PayloadFragmenter.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import io.reactivesocket.FrameType; diff --git a/src/main/java/io/reactivesocket/internal/PayloadReassembler.java b/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/PayloadReassembler.java rename to src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java index 4598b8415..3f5f5ea7b 100644 --- a/src/main/java/io/reactivesocket/internal/PayloadReassembler.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import io.reactivesocket.Payload; + import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; diff --git a/src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java index f1ad7b6af..88e67db1c 100644 --- a/src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java index e56cbfde5..7e0f4dea1 100644 --- a/src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java index 18e89f87c..bebe4b34c 100644 --- a/src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java b/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java rename to src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java index e2250872f..6a4420246 100644 --- a/src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java b/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java rename to src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java index f990e3bf3..8842a38d7 100644 --- a/src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/UnpooledFrame.java b/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/UnpooledFrame.java rename to src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java index 739f626fe..c5de2775e 100644 --- a/src/main/java/io/reactivesocket/internal/UnpooledFrame.java +++ b/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/test/java/io/reactivesocket/FrameTest.java b/src/test/java/io/reactivesocket/FrameTest.java index 3a6b66631..9cd28eac3 100644 --- a/src/test/java/io/reactivesocket/FrameTest.java +++ b/src/test/java/io/reactivesocket/FrameTest.java @@ -21,7 +21,8 @@ import java.util.concurrent.TimeUnit; import io.reactivesocket.exceptions.RejectedException; -import io.reactivesocket.internal.SetupFrameFlyweight; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; + import org.junit.Test; import org.junit.experimental.theories.DataPoint; import org.junit.experimental.theories.Theories; @@ -29,7 +30,7 @@ import org.junit.runner.RunWith; import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; -import static io.reactivesocket.internal.ErrorFrameFlyweight.*; +import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*; import static java.nio.charset.StandardCharsets.UTF_8; @RunWith(Theories.class) diff --git a/src/test/java/io/reactivesocket/internal/FragmenterTest.java b/src/test/java/io/reactivesocket/internal/FragmenterTest.java index ba6ecd598..686394d08 100644 --- a/src/test/java/io/reactivesocket/internal/FragmenterTest.java +++ b/src/test/java/io/reactivesocket/internal/FragmenterTest.java @@ -18,6 +18,9 @@ import io.reactivesocket.Frame; import io.reactivesocket.Payload; import io.reactivesocket.TestUtil; +import io.reactivesocket.internal.frame.FrameHeaderFlyweight; +import io.reactivesocket.internal.frame.PayloadFragmenter; + import org.junit.Test; import static org.junit.Assert.*; diff --git a/src/test/java/io/reactivesocket/internal/ReassemblerTest.java b/src/test/java/io/reactivesocket/internal/ReassemblerTest.java index dca8a3859..0b134d697 100644 --- a/src/test/java/io/reactivesocket/internal/ReassemblerTest.java +++ b/src/test/java/io/reactivesocket/internal/ReassemblerTest.java @@ -19,6 +19,8 @@ import io.reactivesocket.FrameType; import io.reactivesocket.Payload; import io.reactivesocket.TestUtil; +import io.reactivesocket.internal.frame.FrameHeaderFlyweight; +import io.reactivesocket.internal.frame.PayloadReassembler; import io.reactivex.subjects.ReplaySubject; import org.junit.Test; From 42a23e80db82ae9365606cc85ce86082115c7901 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Sep 2015 20:20:24 -0700 Subject: [PATCH 4/5] Package Organization (.lease, .rx) --- src/main/java/io/reactivesocket/DuplexConnection.java | 3 ++- src/main/java/io/reactivesocket/LeaseGovernor.java | 2 ++ src/main/java/io/reactivesocket/ReactiveSocket.java | 7 ++++--- src/main/java/io/reactivesocket/internal/Requester.java | 6 +++--- src/main/java/io/reactivesocket/internal/Responder.java | 5 +++-- .../io/reactivesocket/internal/rx/BooleanDisposable.java | 2 +- .../reactivesocket/internal/rx/CompositeCompletable.java | 2 +- .../reactivesocket/internal/rx/CompositeDisposable.java | 4 ++-- .../io/reactivesocket/internal/rx/EmptyDisposable.java | 2 +- .../io/reactivesocket/{ => lease}/FairLeaseGovernor.java | 4 +++- .../io/reactivesocket/{ => lease}/NullLeaseGovernor.java | 4 +++- .../{ => lease}/UnlimitedLeaseGovernor.java | 4 +++- .../java/io/reactivesocket/{ => rx}/Completable.java | 2 +- .../io/reactivesocket/{observable => rx}/Disposable.java | 2 +- .../io/reactivesocket/{observable => rx}/Observable.java | 2 +- .../io/reactivesocket/{observable => rx}/Observer.java | 2 +- .../java/io/reactivesocket/{observable => rx}/README.md | 2 +- src/perf/java/io/reactivesocket/ReactiveSocketPerf.java | 1 + .../io/reactivesocket/perfutil/PerfTestConnection.java | 4 ++-- .../perfutil/PerfUnicastSubjectNoBackpressure.java | 6 +++--- src/test/java/io/reactivesocket/LatchedCompletable.java | 2 ++ src/test/java/io/reactivesocket/ReactiveSocketTest.java | 1 + src/test/java/io/reactivesocket/SerializedEventBus.java | 4 ++-- src/test/java/io/reactivesocket/TestConnection.java | 9 +++++---- .../TestConnectionWithControlledRequestN.java | 2 ++ .../java/io/reactivesocket/TestTransportRequestN.java | 1 + .../java/io/reactivesocket/internal/RequesterTest.java | 2 +- 27 files changed, 53 insertions(+), 34 deletions(-) rename src/main/java/io/reactivesocket/{ => lease}/FairLeaseGovernor.java (95%) rename src/main/java/io/reactivesocket/{ => lease}/NullLeaseGovernor.java (76%) rename src/main/java/io/reactivesocket/{ => lease}/UnlimitedLeaseGovernor.java (79%) rename src/main/java/io/reactivesocket/{ => rx}/Completable.java (79%) rename src/main/java/io/reactivesocket/{observable => rx}/Disposable.java (61%) rename src/main/java/io/reactivesocket/{observable => rx}/Observable.java (66%) rename src/main/java/io/reactivesocket/{observable => rx}/Observer.java (81%) rename src/main/java/io/reactivesocket/{observable => rx}/README.md (61%) diff --git a/src/main/java/io/reactivesocket/DuplexConnection.java b/src/main/java/io/reactivesocket/DuplexConnection.java index 41468afb7..e1811ed33 100644 --- a/src/main/java/io/reactivesocket/DuplexConnection.java +++ b/src/main/java/io/reactivesocket/DuplexConnection.java @@ -19,7 +19,8 @@ import org.reactivestreams.Publisher; -import io.reactivesocket.observable.Observable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; /** * Represents a connection with input/output that the protocol uses. diff --git a/src/main/java/io/reactivesocket/LeaseGovernor.java b/src/main/java/io/reactivesocket/LeaseGovernor.java index eb1220131..854958adc 100644 --- a/src/main/java/io/reactivesocket/LeaseGovernor.java +++ b/src/main/java/io/reactivesocket/LeaseGovernor.java @@ -1,6 +1,8 @@ package io.reactivesocket; import io.reactivesocket.internal.Responder; +import io.reactivesocket.lease.NullLeaseGovernor; +import io.reactivesocket.lease.UnlimitedLeaseGovernor; public interface LeaseGovernor { public static final LeaseGovernor NULL_LEASE_GOVERNOR = new NullLeaseGovernor(); diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index 7cd372a5c..a3f6b204d 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -30,9 +30,10 @@ import io.reactivesocket.internal.Responder; import io.reactivesocket.internal.rx.CompositeCompletable; import io.reactivesocket.internal.rx.CompositeDisposable; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.BitUtil; /** diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index 6f252a044..2ec6b52fe 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -29,7 +29,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.Completable; import io.reactivesocket.ConnectionSetupPayload; import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; @@ -42,8 +41,9 @@ import io.reactivesocket.internal.rx.BackpressureUtils; import io.reactivesocket.internal.rx.EmptyDisposable; import io.reactivesocket.internal.rx.EmptySubscription; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; /** diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index 4c944ee87..02636cbe8 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -31,8 +31,9 @@ import io.reactivesocket.internal.frame.SetupFrameFlyweight; import io.reactivesocket.internal.rx.EmptyDisposable; import io.reactivesocket.internal.rx.EmptySubscription; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; /** diff --git a/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java b/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java index 080dbceef..6e4b70d27 100644 --- a/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java @@ -2,7 +2,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import io.reactivesocket.observable.Disposable; +import io.reactivesocket.rx.Disposable; public final class BooleanDisposable implements Disposable { volatile Runnable run; diff --git a/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java b/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java index 131d0c719..7d4f4a020 100644 --- a/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java +++ b/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java @@ -3,7 +3,7 @@ import java.util.HashSet; import java.util.Set; -import io.reactivesocket.Completable; +import io.reactivesocket.rx.Completable; /** * A Completable container that can hold onto multiple other Completables. diff --git a/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java b/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java index 6b74b8994..f8cbf5b1b 100644 --- a/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java @@ -3,8 +3,8 @@ import java.util.HashSet; import java.util.Set; -import io.reactivesocket.Completable; -import io.reactivesocket.observable.Disposable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; /** * A Disposable container that can hold onto multiple other Disposables. diff --git a/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java b/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java index 1e6cd4723..785fd7856 100644 --- a/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java @@ -15,7 +15,7 @@ */ package io.reactivesocket.internal.rx; -import io.reactivesocket.observable.Disposable; +import io.reactivesocket.rx.Disposable; public class EmptyDisposable implements Disposable { diff --git a/src/main/java/io/reactivesocket/FairLeaseGovernor.java b/src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java similarity index 95% rename from src/main/java/io/reactivesocket/FairLeaseGovernor.java rename to src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java index b45176150..437a81998 100644 --- a/src/main/java/io/reactivesocket/FairLeaseGovernor.java +++ b/src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java @@ -1,5 +1,7 @@ -package io.reactivesocket; +package io.reactivesocket.lease; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; import io.reactivesocket.internal.Responder; import java.util.HashMap; diff --git a/src/main/java/io/reactivesocket/NullLeaseGovernor.java b/src/main/java/io/reactivesocket/lease/NullLeaseGovernor.java similarity index 76% rename from src/main/java/io/reactivesocket/NullLeaseGovernor.java rename to src/main/java/io/reactivesocket/lease/NullLeaseGovernor.java index 4fd85f8dd..a08fc1bac 100644 --- a/src/main/java/io/reactivesocket/NullLeaseGovernor.java +++ b/src/main/java/io/reactivesocket/lease/NullLeaseGovernor.java @@ -1,5 +1,7 @@ -package io.reactivesocket; +package io.reactivesocket.lease; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; import io.reactivesocket.internal.Responder; public class NullLeaseGovernor implements LeaseGovernor { diff --git a/src/main/java/io/reactivesocket/UnlimitedLeaseGovernor.java b/src/main/java/io/reactivesocket/lease/UnlimitedLeaseGovernor.java similarity index 79% rename from src/main/java/io/reactivesocket/UnlimitedLeaseGovernor.java rename to src/main/java/io/reactivesocket/lease/UnlimitedLeaseGovernor.java index d080f0801..3cff13ff6 100644 --- a/src/main/java/io/reactivesocket/UnlimitedLeaseGovernor.java +++ b/src/main/java/io/reactivesocket/lease/UnlimitedLeaseGovernor.java @@ -1,5 +1,7 @@ -package io.reactivesocket; +package io.reactivesocket.lease; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; import io.reactivesocket.internal.Responder; public class UnlimitedLeaseGovernor implements LeaseGovernor { diff --git a/src/main/java/io/reactivesocket/Completable.java b/src/main/java/io/reactivesocket/rx/Completable.java similarity index 79% rename from src/main/java/io/reactivesocket/Completable.java rename to src/main/java/io/reactivesocket/rx/Completable.java index dc1cd4c75..9f87f6a11 100644 --- a/src/main/java/io/reactivesocket/Completable.java +++ b/src/main/java/io/reactivesocket/rx/Completable.java @@ -1,4 +1,4 @@ -package io.reactivesocket; +package io.reactivesocket.rx; public interface Completable { diff --git a/src/main/java/io/reactivesocket/observable/Disposable.java b/src/main/java/io/reactivesocket/rx/Disposable.java similarity index 61% rename from src/main/java/io/reactivesocket/observable/Disposable.java rename to src/main/java/io/reactivesocket/rx/Disposable.java index 5d011fefd..df6efcda7 100644 --- a/src/main/java/io/reactivesocket/observable/Disposable.java +++ b/src/main/java/io/reactivesocket/rx/Disposable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.observable; +package io.reactivesocket.rx; public interface Disposable { diff --git a/src/main/java/io/reactivesocket/observable/Observable.java b/src/main/java/io/reactivesocket/rx/Observable.java similarity index 66% rename from src/main/java/io/reactivesocket/observable/Observable.java rename to src/main/java/io/reactivesocket/rx/Observable.java index 8fe5292ee..9c5d6e39d 100644 --- a/src/main/java/io/reactivesocket/observable/Observable.java +++ b/src/main/java/io/reactivesocket/rx/Observable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.observable; +package io.reactivesocket.rx; public interface Observable { diff --git a/src/main/java/io/reactivesocket/observable/Observer.java b/src/main/java/io/reactivesocket/rx/Observer.java similarity index 81% rename from src/main/java/io/reactivesocket/observable/Observer.java rename to src/main/java/io/reactivesocket/rx/Observer.java index d58a5895c..5a8bafde7 100644 --- a/src/main/java/io/reactivesocket/observable/Observer.java +++ b/src/main/java/io/reactivesocket/rx/Observer.java @@ -1,4 +1,4 @@ -package io.reactivesocket.observable; +package io.reactivesocket.rx; public interface Observer { diff --git a/src/main/java/io/reactivesocket/observable/README.md b/src/main/java/io/reactivesocket/rx/README.md similarity index 61% rename from src/main/java/io/reactivesocket/observable/README.md rename to src/main/java/io/reactivesocket/rx/README.md index 8c190d5b6..e75d96494 100644 --- a/src/main/java/io/reactivesocket/observable/README.md +++ b/src/main/java/io/reactivesocket/rx/README.md @@ -1,3 +1,3 @@ Interfaces for `Observable` that does not support backpressure. -TODO: Decide if we just use concrete types from RxJava 2 once this type exists. (Flowable vs Observable) \ No newline at end of file +TODO: Decide if we just use concrete types from RxJava 2 once this type exists. (Flowable vs Observable) (BenC would prefer this package go away) \ No newline at end of file diff --git a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java index 7e0762d3c..60291f22a 100644 --- a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java +++ b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java @@ -18,6 +18,7 @@ import io.reactivesocket.internal.PublisherUtils; import io.reactivesocket.perfutil.PerfTestConnection; +import io.reactivesocket.rx.Completable; @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java index ca1c13726..2cf44038a 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java @@ -21,10 +21,10 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.Completable; import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; -import io.reactivesocket.observable.Observable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; public class PerfTestConnection implements DuplexConnection { diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java b/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java index 4c768cc7b..32652f949 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java @@ -17,9 +17,9 @@ import java.util.function.Consumer; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; /** * The difference between this and the real UnicastSubject is in the `onSubscribe` method where it calls requestN. Not sure that behavior should exist in the producton code. diff --git a/src/test/java/io/reactivesocket/LatchedCompletable.java b/src/test/java/io/reactivesocket/LatchedCompletable.java index 159362b42..e70df1df4 100644 --- a/src/test/java/io/reactivesocket/LatchedCompletable.java +++ b/src/test/java/io/reactivesocket/LatchedCompletable.java @@ -18,6 +18,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import io.reactivesocket.rx.Completable; + public class LatchedCompletable implements Completable { final CountDownLatch latch; diff --git a/src/test/java/io/reactivesocket/ReactiveSocketTest.java b/src/test/java/io/reactivesocket/ReactiveSocketTest.java index f1b0981f1..5dbd3241b 100644 --- a/src/test/java/io/reactivesocket/ReactiveSocketTest.java +++ b/src/test/java/io/reactivesocket/ReactiveSocketTest.java @@ -35,6 +35,7 @@ import org.reactivestreams.Publisher; import io.reactivesocket.internal.PublisherUtils; +import io.reactivesocket.lease.FairLeaseGovernor; import io.reactivex.disposables.Disposable; import io.reactivex.observables.ConnectableObservable; import io.reactivex.subscribers.TestSubscriber; diff --git a/src/test/java/io/reactivesocket/SerializedEventBus.java b/src/test/java/io/reactivesocket/SerializedEventBus.java index 031b0a363..01018b9ee 100644 --- a/src/test/java/io/reactivesocket/SerializedEventBus.java +++ b/src/test/java/io/reactivesocket/SerializedEventBus.java @@ -18,7 +18,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Observer; import io.reactivex.subjects.PublishSubject; import io.reactivex.subjects.Subject; @@ -66,7 +66,7 @@ public void onComplete() { } @Override - public void onSubscribe(io.reactivesocket.observable.Disposable d) { + public void onSubscribe(io.reactivesocket.rx.Disposable d) { // TODO Auto-generated method stub } diff --git a/src/test/java/io/reactivesocket/TestConnection.java b/src/test/java/io/reactivesocket/TestConnection.java index 2b28c8445..61e1ffc43 100644 --- a/src/test/java/io/reactivesocket/TestConnection.java +++ b/src/test/java/io/reactivesocket/TestConnection.java @@ -21,7 +21,8 @@ import org.reactivestreams.Publisher; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observer; import io.reactivex.Observable; import io.reactivex.Scheduler.Worker; import io.reactivex.schedulers.Schedulers; @@ -42,14 +43,14 @@ public void addOutput(Publisher o, Completable callback) { } @Override - public io.reactivesocket.observable.Observable getInput() { - return new io.reactivesocket.observable.Observable() { + public io.reactivesocket.rx.Observable getInput() { + return new io.reactivesocket.rx.Observable() { @Override public void subscribe(Observer o) { toInput.add(o); // we are okay with the race of sending data and cancelling ... since this is "hot" by definition and unsubscribing is a race. - o.onSubscribe(new io.reactivesocket.observable.Disposable() { + o.onSubscribe(new io.reactivesocket.rx.Disposable() { @Override public void dispose() { diff --git a/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java b/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java index f512310f8..e70368a36 100644 --- a/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java +++ b/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java @@ -24,6 +24,8 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import io.reactivesocket.rx.Completable; + /** * Connection that by defaults only calls request(1) on a Publisher to addOutput. Any further must be done via requestMore(n) *

diff --git a/src/test/java/io/reactivesocket/TestTransportRequestN.java b/src/test/java/io/reactivesocket/TestTransportRequestN.java index cb9ce1c85..bb3d0b785 100644 --- a/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.reactivestreams.Publisher; +import io.reactivesocket.lease.FairLeaseGovernor; import io.reactivex.subscribers.TestSubscriber; /** diff --git a/src/test/java/io/reactivesocket/internal/RequesterTest.java b/src/test/java/io/reactivesocket/internal/RequesterTest.java index b9858003f..56a680218 100644 --- a/src/test/java/io/reactivesocket/internal/RequesterTest.java +++ b/src/test/java/io/reactivesocket/internal/RequesterTest.java @@ -28,13 +28,13 @@ import org.junit.Test; -import io.reactivesocket.Completable; import io.reactivesocket.ConnectionSetupPayload; import io.reactivesocket.Frame; import io.reactivesocket.FrameType; import io.reactivesocket.LatchedCompletable; import io.reactivesocket.Payload; import io.reactivesocket.TestConnection; +import io.reactivesocket.rx.Completable; import io.reactivex.subscribers.TestSubscriber; import io.reactivex.Observable; import io.reactivex.subjects.ReplaySubject; From b3d27e059c42847222f220f58eb40f8b866ef902 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Sep 2015 22:40:26 -0700 Subject: [PATCH 5/5] Revert Fragmentation Changes (not ready for these yet) --- src/main/java/io/reactivesocket/internal/Responder.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index 02636cbe8..31dc46af9 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -336,7 +336,7 @@ public void request(long n) { if (n > 0 && started.compareAndSet(false, true)) { final int streamId = requestFrame.getStreamId(); - new FragmentedPublisher(FrameType.NEXT_COMPLETE, streamId, requestHandler.handleRequestResponse(requestFrame)).subscribe(new Subscriber() { + requestHandler.handleRequestResponse(requestFrame).subscribe(new Subscriber() { // event emission is serialized so this doesn't need to be atomic int count = 0; @@ -352,11 +352,12 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(Frame v) { + public void onNext(Payload v) { if (++count > 1) { onError(new IllegalStateException("RequestResponse expects a single onNext")); } else { - child.onNext(v); + + child.onNext(Frame.Response.from(streamId, FrameType.NEXT_COMPLETE, v)); } }