diff --git a/src/main/java/io/reactivesocket/ConnectionSetupPayload.java b/src/main/java/io/reactivesocket/ConnectionSetupPayload.java index f7f77052d..fa6e4dd07 100644 --- a/src/main/java/io/reactivesocket/ConnectionSetupPayload.java +++ b/src/main/java/io/reactivesocket/ConnectionSetupPayload.java @@ -15,10 +15,10 @@ */ package io.reactivesocket; -import io.reactivesocket.internal.SetupFrameFlyweight; - import java.nio.ByteBuffer; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; + /** * Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data */ diff --git a/src/main/java/io/reactivesocket/DuplexConnection.java b/src/main/java/io/reactivesocket/DuplexConnection.java index 41468afb7..e1811ed33 100644 --- a/src/main/java/io/reactivesocket/DuplexConnection.java +++ b/src/main/java/io/reactivesocket/DuplexConnection.java @@ -19,7 +19,8 @@ import org.reactivestreams.Publisher; -import io.reactivesocket.observable.Observable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; /** * Represents a connection with input/output that the protocol uses. diff --git a/src/main/java/io/reactivesocket/Frame.java b/src/main/java/io/reactivesocket/Frame.java index b9da0ac4d..dd29c8e35 100644 --- a/src/main/java/io/reactivesocket/Frame.java +++ b/src/main/java/io/reactivesocket/Frame.java @@ -16,6 +16,14 @@ package io.reactivesocket; import io.reactivesocket.internal.*; +import io.reactivesocket.internal.frame.ErrorFrameFlyweight; +import io.reactivesocket.internal.frame.FrameHeaderFlyweight; +import io.reactivesocket.internal.frame.FramePool; +import io.reactivesocket.internal.frame.LeaseFrameFlyweight; +import io.reactivesocket.internal.frame.RequestFrameFlyweight; +import io.reactivesocket.internal.frame.RequestNFrameFlyweight; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; +import io.reactivesocket.internal.frame.UnpooledFrame; import uk.co.real_logic.agrona.DirectBuffer; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/LeaseGovernor.java b/src/main/java/io/reactivesocket/LeaseGovernor.java index eb1220131..854958adc 100644 --- a/src/main/java/io/reactivesocket/LeaseGovernor.java +++ b/src/main/java/io/reactivesocket/LeaseGovernor.java @@ -1,6 +1,8 @@ package io.reactivesocket; import io.reactivesocket.internal.Responder; +import io.reactivesocket.lease.NullLeaseGovernor; +import io.reactivesocket.lease.UnlimitedLeaseGovernor; public interface LeaseGovernor { public static final LeaseGovernor NULL_LEASE_GOVERNOR = new NullLeaseGovernor(); diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index 89e52a5f7..a3f6b204d 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -26,13 +26,14 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.internal.CompositeCompletable; -import io.reactivesocket.internal.CompositeDisposable; import io.reactivesocket.internal.Requester; import io.reactivesocket.internal.Responder; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.internal.rx.CompositeCompletable; +import io.reactivesocket.internal.rx.CompositeDisposable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.BitUtil; /** diff --git a/src/main/java/io/reactivesocket/exceptions/Exceptions.java b/src/main/java/io/reactivesocket/exceptions/Exceptions.java index 93848cd99..7b2581b80 100644 --- a/src/main/java/io/reactivesocket/exceptions/Exceptions.java +++ b/src/main/java/io/reactivesocket/exceptions/Exceptions.java @@ -19,7 +19,7 @@ import java.nio.ByteBuffer; -import static io.reactivesocket.internal.ErrorFrameFlyweight.*; +import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*; import static java.nio.charset.StandardCharsets.UTF_8; public class Exceptions { diff --git a/src/main/java/io/reactivesocket/internal/FragmentedPublisher.java b/src/main/java/io/reactivesocket/internal/FragmentedPublisher.java new file mode 100644 index 000000000..4a4f21c6f --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/FragmentedPublisher.java @@ -0,0 +1,57 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.internal; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.reactivesocket.Frame; +import io.reactivesocket.FrameType; +import io.reactivesocket.Payload; +import io.reactivesocket.internal.frame.PayloadFragmenter; + +public class FragmentedPublisher implements Publisher { + + private final PayloadFragmenter fragmenter = new PayloadFragmenter(Frame.METADATA_MTU, Frame.DATA_MTU); + private final Publisher responsePublisher; + private final int streamId; + private final FrameType type; + + public FragmentedPublisher(FrameType type, int streamId, Publisher responsePublisher) { + this.type = type; + this.streamId = streamId; + this.responsePublisher = responsePublisher; + } + + @Override + public void subscribe(Subscriber child) { + child.onSubscribe(new Subscription() { + + @Override + public void request(long n) { + // TODO Auto-generated method stub + + } + + @Override + public void cancel() { + // TODO Auto-generated method stub + + }}); + } + +} diff --git a/src/main/java/io/reactivesocket/internal/PublisherUtils.java b/src/main/java/io/reactivesocket/internal/PublisherUtils.java index cf9b92e4d..78c47343a 100644 --- a/src/main/java/io/reactivesocket/internal/PublisherUtils.java +++ b/src/main/java/io/reactivesocket/internal/PublisherUtils.java @@ -27,6 +27,10 @@ import io.reactivesocket.Frame; import io.reactivesocket.Payload; +import io.reactivesocket.internal.rx.BackpressureHelper; +import io.reactivesocket.internal.rx.BackpressureUtils; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.internal.rx.SubscriptionHelper; public class PublisherUtils { diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index aecdaf030..2ec6b52fe 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -29,7 +29,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.Completable; import io.reactivesocket.ConnectionSetupPayload; import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; @@ -38,8 +37,13 @@ import io.reactivesocket.exceptions.CancelException; import io.reactivesocket.exceptions.Exceptions; import io.reactivesocket.exceptions.Retryable; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.internal.frame.RequestFrameFlyweight; +import io.reactivesocket.internal.rx.BackpressureUtils; +import io.reactivesocket.internal.rx.EmptyDisposable; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; /** diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index 186c43f51..31dc46af9 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -28,8 +28,12 @@ import org.reactivestreams.Subscription; import io.reactivesocket.exceptions.SetupException; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; +import io.reactivesocket.internal.rx.EmptyDisposable; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; /** @@ -241,6 +245,7 @@ public void onNext(Frame requestFrame) { final RejectedException exception = new RejectedException("No associated lease"); responsePublisher = PublisherUtils.errorFrame(streamId, exception); } + connection.addOutput(responsePublisher, new Completable() { @Override diff --git a/src/main/java/io/reactivesocket/internal/ByteBufferUtil.java b/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/ByteBufferUtil.java rename to src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java index 14e21f99b..b6cc0fa0c 100644 --- a/src/main/java/io/reactivesocket/internal/ByteBufferUtil.java +++ b/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import java.nio.ByteBuffer; diff --git a/src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java index 57f34ef25..40dbcc115 100644 --- a/src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import io.reactivesocket.exceptions.*; diff --git a/src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java index cb83b2d9e..be6c61a76 100644 --- a/src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java @@ -13,18 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; import uk.co.real_logic.agrona.DirectBuffer; import uk.co.real_logic.agrona.MutableDirectBuffer; +import static io.reactivesocket.internal.frame.ByteBufferUtil.*; + import java.nio.ByteBuffer; import java.nio.ByteOrder; -import static io.reactivesocket.internal.ByteBufferUtil.preservingSlice; - /** * Per connection frame flyweight. * diff --git a/src/main/java/io/reactivesocket/internal/FramePool.java b/src/main/java/io/reactivesocket/internal/frame/FramePool.java similarity index 96% rename from src/main/java/io/reactivesocket/internal/FramePool.java rename to src/main/java/io/reactivesocket/internal/frame/FramePool.java index 134295c84..4290891cb 100644 --- a/src/main/java/io/reactivesocket/internal/FramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/FramePool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java index 4b68cd0ad..8e03b8904 100644 --- a/src/main/java/io/reactivesocket/internal/KeepaliveFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.DirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java index af3375ba5..8a663fca5 100644 --- a/src/main/java/io/reactivesocket/internal/LeaseFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/PayloadBuilder.java b/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/PayloadBuilder.java rename to src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java index 71358f7cb..a068ab96e 100644 --- a/src/main/java/io/reactivesocket/internal/PayloadBuilder.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import io.reactivesocket.Payload; diff --git a/src/main/java/io/reactivesocket/internal/PayloadFragmenter.java b/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/PayloadFragmenter.java rename to src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java index 7fc9d5062..124245726 100644 --- a/src/main/java/io/reactivesocket/internal/PayloadFragmenter.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import io.reactivesocket.FrameType; diff --git a/src/main/java/io/reactivesocket/internal/PayloadReassembler.java b/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/PayloadReassembler.java rename to src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java index 4598b8415..3f5f5ea7b 100644 --- a/src/main/java/io/reactivesocket/internal/PayloadReassembler.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import io.reactivesocket.Payload; + import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; diff --git a/src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java index f1ad7b6af..88e67db1c 100644 --- a/src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java index e56cbfde5..7e0f4dea1 100644 --- a/src/main/java/io/reactivesocket/internal/RequestNFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java similarity index 99% rename from src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java rename to src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java index 18e89f87c..bebe4b34c 100644 --- a/src/main/java/io/reactivesocket/internal/SetupFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; import uk.co.real_logic.agrona.BitUtil; diff --git a/src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java b/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java rename to src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java index e2250872f..6a4420246 100644 --- a/src/main/java/io/reactivesocket/internal/ThreadLocalFramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java b/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java rename to src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java index f990e3bf3..8842a38d7 100644 --- a/src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/UnpooledFrame.java b/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java similarity index 97% rename from src/main/java/io/reactivesocket/internal/UnpooledFrame.java rename to src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java index 739f626fe..c5de2775e 100644 --- a/src/main/java/io/reactivesocket/internal/UnpooledFrame.java +++ b/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; import uk.co.real_logic.agrona.MutableDirectBuffer; diff --git a/src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java b/src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java new file mode 100644 index 000000000..0b1ee24b7 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/AppendOnlyLinkedArrayList.java @@ -0,0 +1,125 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal.rx; + +import java.util.function.*; + +/** + * A linked-array-list implementation that only supports appending and consumption. + * + * @param the value type + */ +public class AppendOnlyLinkedArrayList { + final int capacity; + Object[] head; + Object[] tail; + int offset; + + /** + * Constructs an empty list with a per-link capacity + * @param capacity the capacity of each link + */ + public AppendOnlyLinkedArrayList(int capacity) { + this.capacity = capacity; + this.head = new Object[capacity + 1]; + this.tail = head; + } + + /** + * Append a non-null value to the list. + *

Don't add null to the list! + * @param value the value to append + */ + public void add(T value) { + final int c = capacity; + int o = offset; + if (o == c) { + Object[] next = new Object[c + 1]; + tail[c] = next; + tail = next; + o = 0; + } + tail[o] = value; + offset = o + 1; + } + + /** + * Set a value as the first element of the list. + * @param value the value to set + */ + public void setFirst(T value) { + head[0] = value; + } + + /** + * Loops through all elements of the list. + * @param consumer the consumer of elements + */ + @SuppressWarnings("unchecked") + public void forEach(Consumer consumer) { + Object[] a = head; + final int c = capacity; + while (a != null) { + for (int i = 0; i < c; i++) { + Object o = a[i]; + if (o == null) { + return; + } + consumer.accept((T)o); + } + a = (Object[])a[c]; + } + } + + /** + * Loops over all elements of the array until a null element is encountered or + * the given predicate returns true. + * @param consumer the consumer of values that returns true if the forEach should terminate + */ + @SuppressWarnings("unchecked") + public void forEachWhile(Predicate consumer) { + Object[] a = head; + final int c = capacity; + while (a != null) { + for (int i = 0; i < c; i++) { + Object o = a[i]; + if (o == null) { + return; + } + if (consumer.test((T)o)) { + return; + } + } + a = (Object[])a[c]; + } + } + + @SuppressWarnings("unchecked") + public void forEachWhile(S state, BiPredicate consumer) { + Object[] a = head; + final int c = capacity; + while (a != null) { + for (int i = 0; i < c; i++) { + Object o = a[i]; + if (o == null) { + return; + } + if (consumer.test(state, (T)o)) { + return; + } + } + a = (Object[])a[c]; + } + } +} diff --git a/src/main/java/io/reactivesocket/internal/BackpressureHelper.java b/src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/BackpressureHelper.java rename to src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java index bee59b440..cd51ac01b 100644 --- a/src/main/java/io/reactivesocket/internal/BackpressureHelper.java +++ b/src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java @@ -10,7 +10,7 @@ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.concurrent.atomic.*; diff --git a/src/main/java/io/reactivesocket/internal/BackpressureUtils.java b/src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/BackpressureUtils.java rename to src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java index b8ce796f7..d67598d1f 100644 --- a/src/main/java/io/reactivesocket/internal/BackpressureUtils.java +++ b/src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java @@ -1,4 +1,4 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; /** * Copyright 2015 Netflix, Inc. diff --git a/src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java b/src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java new file mode 100644 index 000000000..214aa00b2 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/BaseArrayQueue.java @@ -0,0 +1,131 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReferenceArray; + +abstract class BaseArrayQueue extends AtomicReferenceArray implements Queue { + /** */ + private static final long serialVersionUID = 5238363267841964068L; + protected final int mask; + public BaseArrayQueue(int capacity) { + super(Pow2.roundToPowerOfTwo(capacity)); + this.mask = length() - 1; + } + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + @Override + public void clear() { + // we have to test isEmpty because of the weaker poll() guarantee + while (poll() != null || !isEmpty()) + ; + } + protected final int calcElementOffset(long index, int mask) { + return (int)index & mask; + } + protected final int calcElementOffset(long index) { + return (int)index & mask; + } + protected final E lvElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); + } + protected final E lpElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); // no weaker form available + } + protected final E lpElement(int offset) { + return get(offset); // no weaker form available + } + protected final void spElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.lazySet(offset, value); // no weaker form available + } + protected final void spElement(int offset, E value) { + lazySet(offset, value); // no weaker form available + } + protected final void soElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.lazySet(offset, value); + } + protected final void soElement(int offset, E value) { + lazySet(offset, value); + } + protected final void svElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.set(offset, value); + } + protected final E lvElement(int offset) { + return get(offset); + } + + @Override + public boolean add(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public E remove() { + throw new UnsupportedOperationException(); + } + + @Override + public E element() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } +} + diff --git a/src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java b/src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java new file mode 100644 index 000000000..bc1047ae2 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/BaseLinkedQueue.java @@ -0,0 +1,94 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +abstract class BaseLinkedQueue extends AbstractQueue { + private final AtomicReference> producerNode; + private final AtomicReference> consumerNode; + public BaseLinkedQueue() { + producerNode = new AtomicReference<>(); + consumerNode = new AtomicReference<>(); + } + protected final LinkedQueueNode lvProducerNode() { + return producerNode.get(); + } + protected final LinkedQueueNode lpProducerNode() { + return producerNode.get(); + } + protected final void spProducerNode(LinkedQueueNode node) { + producerNode.lazySet(node); + } + protected final LinkedQueueNode xchgProducerNode(LinkedQueueNode node) { + return producerNode.getAndSet(node); + } + protected final LinkedQueueNode lvConsumerNode() { + return consumerNode.get(); + } + + protected final LinkedQueueNode lpConsumerNode() { + return consumerNode.get(); + } + protected final void spConsumerNode(LinkedQueueNode node) { + consumerNode.lazySet(node); + } + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * This is an O(n) operation as we run through all the nodes and count them.
+ * + * @see java.util.Queue#size() + */ + @Override + public final int size() { + LinkedQueueNode chaserNode = lvConsumerNode(); + final LinkedQueueNode producerNode = lvProducerNode(); + int size = 0; + // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. + while (chaserNode != producerNode && size < Integer.MAX_VALUE) { + LinkedQueueNode next; + while((next = chaserNode.lvNext()) == null); + chaserNode = next; + size++; + } + return size; + } + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe + * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to + * be null. + * + * @see MessagePassingQueue#isEmpty() + */ + @Override + public final boolean isEmpty() { + return lvConsumerNode() == lvProducerNode(); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/BooleanDisposable.java b/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java similarity index 91% rename from src/main/java/io/reactivesocket/internal/BooleanDisposable.java rename to src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java index 17f75124f..6e4b70d27 100644 --- a/src/main/java/io/reactivesocket/internal/BooleanDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java @@ -1,8 +1,8 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import io.reactivesocket.observable.Disposable; +import io.reactivesocket.rx.Disposable; public final class BooleanDisposable implements Disposable { volatile Runnable run; diff --git a/src/main/java/io/reactivesocket/internal/CompositeCompletable.java b/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java similarity index 94% rename from src/main/java/io/reactivesocket/internal/CompositeCompletable.java rename to src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java index 145853732..7d4f4a020 100644 --- a/src/main/java/io/reactivesocket/internal/CompositeCompletable.java +++ b/src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java @@ -1,9 +1,9 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.HashSet; import java.util.Set; -import io.reactivesocket.Completable; +import io.reactivesocket.rx.Completable; /** * A Completable container that can hold onto multiple other Completables. diff --git a/src/main/java/io/reactivesocket/internal/CompositeDisposable.java b/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java similarity index 88% rename from src/main/java/io/reactivesocket/internal/CompositeDisposable.java rename to src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java index 506007d3f..f8cbf5b1b 100644 --- a/src/main/java/io/reactivesocket/internal/CompositeDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java @@ -1,10 +1,10 @@ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import java.util.HashSet; import java.util.Set; -import io.reactivesocket.Completable; -import io.reactivesocket.observable.Disposable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; /** * A Disposable container that can hold onto multiple other Disposables. diff --git a/src/main/java/io/reactivesocket/internal/EmptyDisposable.java b/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java similarity index 90% rename from src/main/java/io/reactivesocket/internal/EmptyDisposable.java rename to src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java index e878d81a6..785fd7856 100644 --- a/src/main/java/io/reactivesocket/internal/EmptyDisposable.java +++ b/src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; -import io.reactivesocket.observable.Disposable; +import io.reactivesocket.rx.Disposable; public class EmptyDisposable implements Disposable { diff --git a/src/main/java/io/reactivesocket/internal/EmptySubscription.java b/src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/EmptySubscription.java rename to src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java index cb2c58581..fe2c38685 100644 --- a/src/main/java/io/reactivesocket/internal/EmptySubscription.java +++ b/src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java @@ -11,7 +11,7 @@ * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import org.reactivestreams.*; diff --git a/src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java b/src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java new file mode 100644 index 000000000..bc03d6f0c --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/LinkedQueueNode.java @@ -0,0 +1,58 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.concurrent.atomic.AtomicReference; + +public final class LinkedQueueNode extends AtomicReference> { + /** */ + private static final long serialVersionUID = 2404266111789071508L; + private E value; + LinkedQueueNode() { + } + LinkedQueueNode(E val) { + spValue(val); + } + /** + * Gets the current value and nulls out the reference to it from this node. + * + * @return value + */ + public E getAndNullValue() { + E temp = lpValue(); + spValue(null); + return temp; + } + + public E lpValue() { + return value; + } + + public void spValue(E newValue) { + value = newValue; + } + + public void soNext(LinkedQueueNode n) { + lazySet(n); + } + + public LinkedQueueNode lvNext() { + return get(); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java b/src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java new file mode 100644 index 000000000..45741ac38 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/MpscLinkedQueue.java @@ -0,0 +1,112 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +/** + * A multi-producer single consumer unbounded queue. + */ +public final class MpscLinkedQueue extends BaseLinkedQueue { + + public MpscLinkedQueue() { + super(); + LinkedQueueNode node = new LinkedQueueNode<>(); + spConsumerNode(node); + xchgProducerNode(node);// this ensures correct construction: StoreLoad + } + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Offer is allowed from multiple threads.
+ * Offer allocates a new node and: + *

    + *
  1. Swaps it atomically with current producer node (only one producer 'wins') + *
  2. Sets the new node as the node following from the swapped producer node + *
+ * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can + * get the same producer node as part of XCHG guarantee. + * + * @see MessagePassingQueue#offer(Object) + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public final boolean offer(final T nextValue) { + final LinkedQueueNode nextNode = new LinkedQueueNode<>(nextValue); + final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); + // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed + // and completes the store in prev.next. + prevProducerNode.soNext(nextNode); // StoreStore + return true; + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll reads the next node from the consumerNode and: + *

    + *
  1. If it is null, the queue is assumed empty (though it might not be). + *
  2. If it is not null set it as the consumer node and return it's now evacuated value. + *
+ * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null + * values are not allowed to be offered this is the only node with it's value set to null at any one time. + * + * @see MessagePassingQueue#poll() + * @see java.util.Queue#poll() + */ + @Override + public final T poll() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final T nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + else if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + + // we have to null out the value because we are going to hang on to the node + final T nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + return null; + } + + @Override + public final T peek() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + return nextNode.lpValue(); + } else + if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while ((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + return nextNode.lpValue(); + } + return null; + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/NotificationLite.java b/src/main/java/io/reactivesocket/internal/rx/NotificationLite.java new file mode 100644 index 000000000..2091ed836 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/NotificationLite.java @@ -0,0 +1,207 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import java.io.Serializable; + +import org.reactivestreams.*; + +/** + * Lightweight notification handling utility class. + */ +public enum NotificationLite { + // No instances + ; + + /** + * Indicates a completion notification. + */ + private enum Complete { + INSTANCE; + @Override + public String toString() { + return "NotificationLite.Complete"; + }; + } + + /** + * Wraps a Throwable. + */ + private static final class ErrorNotification implements Serializable { + /** */ + private static final long serialVersionUID = -8759979445933046293L; + final Throwable e; + ErrorNotification(Throwable e) { + this.e = e; + } + + @Override + public String toString() { + return "NotificationLite.Error[" + e + "]"; + } + } + + /** + * Wraps a Subscription. + */ + private static final class SubscriptionNotification implements Serializable { + /** */ + private static final long serialVersionUID = -1322257508628817540L; + final Subscription s; + SubscriptionNotification(Subscription s) { + this.s = s; + } + + @Override + public String toString() { + return "NotificationLite.Subscription[" + s + "]"; + } + } + + /** + * Converts a value into a notification value. + * @param value the value to convert + * @return the notification representing the value + */ + public static Object next(T value) { + return value; + } + + /** + * Returns a complete notification. + * @return a complete notification + */ + public static Object complete() { + return Complete.INSTANCE; + } + + /** + * Converts a Throwable into a notification value. + * @param e the Throwable to convert + * @return the notification representing the Throwable + */ + public static Object error(Throwable e) { + return new ErrorNotification(e); + } + + /** + * Converts a Subscription into a notification value. + * @param e the Subscription to convert + * @return the notification representing the Subscription + */ + public static Object subscription(Subscription s) { + return new SubscriptionNotification(s); + } + + /** + * Checks if the given object represents a complete notification. + * @param o the object to check + * @return true if the object represents a complete notification + */ + public static boolean isComplete(Object o) { + return o == Complete.INSTANCE; + } + + /** + * Checks if the given object represents a error notification. + * @param o the object to check + * @return true if the object represents a error notification + */ + public static boolean isError(Object o) { + return o instanceof ErrorNotification; + } + + /** + * Checks if the given object represents a subscription notification. + * @param o the object to check + * @return true if the object represents a subscription notification + */ + public static boolean isSubscription(Object o) { + return o instanceof SubscriptionNotification; + } + + /** + * Extracts the value from the notification object + * @param o the notification object + * @return the extracted value + */ + @SuppressWarnings("unchecked") + public static T getValue(Object o) { + return (T)o; + } + + /** + * Extracts the Throwable from the notification object + * @param o the notification object + * @return the extracted Throwable + */ + public static Throwable getError(Object o) { + return ((ErrorNotification)o).e; + } + + /** + * Extracts the Subscription from the notification object + * @param o the notification object + * @return the extracted Subscription + */ + public static Subscription getSubscription(Object o) { + return ((SubscriptionNotification)o).s; + } + + /** + * Calls the appropriate Subscriber method based on the type of the notification. + *

Does not check for a subscription notification, see {@link #acceptFull(Object, Subscriber)}. + * @param o the notification object + * @param s the subscriber to call methods on + * @return true if the notification was a terminal event (i.e., complete or error) + * @see #acceptFull(Object, Subscriber) + */ + @SuppressWarnings("unchecked") + public static boolean accept(Object o, Subscriber s) { + if (o == Complete.INSTANCE) { + s.onComplete(); + return true; + } else + if (o instanceof ErrorNotification) { + s.onError(((ErrorNotification)o).e); + return true; + } + s.onNext((T)o); + return false; + } + + /** + * Calls the appropriate Subscriber method based on the type of the notification. + * @param o the notification object + * @param s the subscriber to call methods on + * @return true if the notification was a terminal event (i.e., complete or error) + * @see #accept(Object, Subscriber) + */ + @SuppressWarnings("unchecked") + public static boolean acceptFull(Object o, Subscriber s) { + if (o == Complete.INSTANCE) { + s.onComplete(); + return true; + } else + if (o instanceof ErrorNotification) { + s.onError(((ErrorNotification)o).e); + return true; + } else + if (o instanceof SubscriptionNotification) { + s.onSubscribe(((SubscriptionNotification)o).s); + return false; + } + s.onNext((T)o); + return false; + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java b/src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java new file mode 100644 index 000000000..641d7cf2b --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/OperatorConcatMap.java @@ -0,0 +1,202 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.reactivestreams.*; + +public final class OperatorConcatMap { + final Function> mapper; + final int bufferSize; + public OperatorConcatMap(Function> mapper, int bufferSize) { + this.mapper = mapper; + this.bufferSize = bufferSize; + } + + public Subscriber apply(Subscriber s) { + SerializedSubscriber ssub = new SerializedSubscriber<>(s); + SubscriptionArbiter sa = new SubscriptionArbiter(); + ssub.onSubscribe(sa); + return new SourceSubscriber<>(ssub, sa, mapper, bufferSize); + } + + static final class SourceSubscriber extends AtomicInteger implements Subscriber { + /** */ + private static final long serialVersionUID = 8828587559905699186L; + final Subscriber actual; + final SubscriptionArbiter sa; + final Function> mapper; + final Subscriber inner; + final Queue queue; + final int bufferSize; + + Subscription s; + + volatile boolean done; + + volatile long index; + + public SourceSubscriber(Subscriber actual, SubscriptionArbiter sa, + Function> mapper, int bufferSize) { + this.actual = actual; + this.sa = sa; + this.mapper = mapper; + this.bufferSize = bufferSize; + this.inner = new InnerSubscriber<>(actual, sa, this); + Queue q; + if (Pow2.isPowerOfTwo(bufferSize)) { + q = new SpscArrayQueue<>(bufferSize); + } else { + q = new SpscExactArrayQueue<>(bufferSize); + } + this.queue = q; + } + @Override + public void onSubscribe(Subscription s) { + if (this.s != null) { + s.cancel(); + return; + } + this.s = s; + s.request(bufferSize); + } + @Override + public void onNext(T t) { + if (done) { + return; + } + if (!queue.offer(t)) { + cancel(); + actual.onError(new IllegalStateException("More values received than requested!")); + return; + } + if (getAndIncrement() == 0) { + drain(); + } + } + @Override + public void onError(Throwable t) { + if (done) { + return; + } + done = true; + cancel(); + actual.onError(t); + } + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + if (getAndIncrement() == 0) { + drain(); + } + } + + void innerComplete() { + if (decrementAndGet() != 0) { + drain(); + } + if (!done) { + s.request(1); + } + } + + void cancel() { + sa.cancel(); + s.cancel(); + } + + void drain() { + boolean d = done; + T o = queue.poll(); + + if (o == null) { + if (d) { + actual.onComplete(); + return; + } + return; + } + Publisher p; + try { + p = mapper.apply(o); + } catch (Throwable e) { + cancel(); + actual.onError(e); + return; + } + index++; + // this is not RS but since our Subscriber doesn't hold state by itself, + // subscribing it to each source is safe and saves allocation + p.subscribe(inner); + } + } + + static final class InnerSubscriber implements Subscriber { + final Subscriber actual; + final SubscriptionArbiter sa; + final SourceSubscriber parent; + + /* + * FIXME this is a workaround for now, but doesn't work + * for async non-conforming sources. + * Such sources require individual instances of InnerSubscriber and a + * done field. + */ + + long index; + + public InnerSubscriber(Subscriber actual, + SubscriptionArbiter sa, SourceSubscriber parent) { + this.actual = actual; + this.sa = sa; + this.parent = parent; + this.index = 1; + } + + @Override + public void onSubscribe(Subscription s) { + if (index == parent.index) { + sa.setSubscription(s); + } + } + + @Override + public void onNext(U t) { + if (index == parent.index) { + actual.onNext(t); + sa.produced(1L); + } + } + @Override + public void onError(Throwable t) { + if (index == parent.index) { + index++; + parent.cancel(); + actual.onError(t); + } + } + @Override + public void onComplete() { + if (index == parent.index) { + index++; + parent.innerComplete(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/Pow2.java b/src/main/java/io/reactivesocket/internal/rx/Pow2.java new file mode 100644 index 000000000..332144a26 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/Pow2.java @@ -0,0 +1,46 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + + +/* + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/util/Pow2.java + */ +package io.reactivesocket.internal.rx; + +public final class Pow2 { + private Pow2() { + throw new IllegalStateException("No instances!"); + } + + /** + * Find the next larger positive power of two value up from the given value. If value is a power of two then + * this value will be returned. + * + * @param value from which next positive power of two will be found. + * @return the next positive power of 2 or this value if it is a power of 2. + */ + public static int roundToPowerOfTwo(final int value) { + return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + } + + /** + * Is this value a power of two. + * + * @param value to be tested to see if it is a power of two. + * @return true if the value is a power of 2 otherwise false. + */ + public static boolean isPowerOfTwo(final int value) { + return (value & (value - 1)) == 0; + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java b/src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java new file mode 100644 index 000000000..fbafaff75 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/QueueDrainHelper.java @@ -0,0 +1,280 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import java.util.concurrent.atomic.*; +import java.util.function.BooleanSupplier; + +/** + * Utility class to help with the queue-drain serialization idiom. + */ +public enum QueueDrainHelper { + ; + + /** + * A fast-path queue-drain serialization logic. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath called if the instance is uncontended. + * @param queue called if the instance is contended to queue up work + * @param drain called if the instance transitions to the drain state successfully + */ + public static void queueDrain(AtomicIntegerFieldUpdater updater, T instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + fastPath.run(); + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + queue.run(); + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic with the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainIf(AtomicIntegerFieldUpdater updater, T instance, + BooleanSupplier fastPath, BooleanSupplier queue, Runnable drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic where the drain is looped until + * the instance state reaches 0 again. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoop(AtomicIntegerFieldUpdater updater, T instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + fastPath.run(); + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + queue.run(); + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + int missed = 1; + for (;;) { + drain.run(); + + missed = updater.addAndGet(instance, -missed); + if (missed == 0) { + return; + } + } + } + + /** + * A fast-path queue-drain serialization logic with looped drain call and the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoopIf(AtomicIntegerFieldUpdater updater, T instance, + BooleanSupplier fastPath, BooleanSupplier queue, BooleanSupplier drain) { + if (updater.get(instance) == 0 && updater.compareAndSet(instance, 0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (updater.decrementAndGet(instance) == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (updater.getAndIncrement(instance) != 0) { + return; + } + } + int missed = 1; + for (;;) { + + if (drain.getAsBoolean()) { + return; + } + + missed = updater.addAndGet(instance, -missed); + if (missed == 0) { + return; + } + } + } + + /** + * A fast-path queue-drain serialization logic. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath called if the instance is uncontended. + * @param queue called if the instance is contended to queue up work + * @param drain called if the instance transitions to the drain state successfully + */ + public static void queueDrain(AtomicInteger instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + fastPath.run(); + if (instance.decrementAndGet() == 0) { + return; + } + } else { + queue.run(); + if (instance.getAndIncrement() != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic with the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + *

The decrementing of the state is left to the drain callback. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainIf(AtomicInteger instance, + BooleanSupplier fastPath, BooleanSupplier queue, Runnable drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (instance.decrementAndGet() == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (instance.getAndIncrement() != 0) { + return; + } + } + drain.run(); + } + + /** + * A fast-path queue-drain serialization logic where the drain is looped until + * the instance state reaches 0 again. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoop(AtomicInteger instance, + Runnable fastPath, Runnable queue, Runnable drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + fastPath.run(); + if (instance.decrementAndGet() == 0) { + return; + } + } else { + queue.run(); + if (instance.getAndIncrement() != 0) { + return; + } + } + int missed = 1; + for (;;) { + drain.run(); + + missed = instance.addAndGet(-missed); + if (missed == 0) { + return; + } + } + } + + /** + * A fast-path queue-drain serialization logic with looped drain call and the ability to leave the state + * in fastpath/drain mode or not continue after the call to queue. + * @param updater + * @param instance + * @param fastPath + * @param queue + * @param drain + */ + public static void queueDrainLoopIf(AtomicInteger instance, + BooleanSupplier fastPath, BooleanSupplier queue, BooleanSupplier drain) { + if (instance.get() == 0 && instance.compareAndSet(0, 1)) { + if (fastPath.getAsBoolean()) { + return; + } + if (instance.decrementAndGet() == 0) { + return; + } + } else { + if (queue.getAsBoolean()) { + return; + } + if (instance.getAndIncrement() != 0) { + return; + } + } + int missed = 1; + for (;;) { + + if (drain.getAsBoolean()) { + return; + } + + missed = instance.addAndGet(-missed); + if (missed == 0) { + return; + } + } + } + +} diff --git a/src/main/java/io/reactivesocket/internal/rx/README.md b/src/main/java/io/reactivesocket/internal/rx/README.md new file mode 100644 index 000000000..dc1b56023 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/README.md @@ -0,0 +1,3 @@ +RxJava v2 code copy/pasted to here since RxJava v2 is not yet ready to be depended upon (still in design flux, rapid code changes, not even a developer preview on Maven Central yet). + +Someday this package should theoretically go away and RxJava v2 directly used. \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java b/src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java new file mode 100644 index 000000000..fab2efcca --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SerializedSubscriber.java @@ -0,0 +1,176 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.internal.rx; + +import org.reactivestreams.*; + + +/** + * Serializes access to the onNext, onError and onComplete methods of another Subscriber. + * + *

Note that onSubscribe is not serialized in respect of the other methods so + * make sure the Subscription is set before any of the other methods are called. + * + *

The implementation assumes that the actual Subscriber's methods don't throw. + * + * @param the value type + */ +public final class SerializedSubscriber implements Subscriber { + final Subscriber actual; + final boolean delayError; + + static final int QUEUE_LINK_SIZE = 4; + + Subscription subscription; + + boolean emitting; + AppendOnlyLinkedArrayList queue; + + volatile boolean done; + + public SerializedSubscriber(Subscriber actual) { + this(actual, false); + } + + public SerializedSubscriber(Subscriber actual, boolean delayError) { + this.actual = actual; + this.delayError = delayError; + } + @Override + public void onSubscribe(Subscription s) { + if (subscription != null) { + s.cancel(); + onError(new IllegalStateException("Subscription already set!")); + return; + } + this.subscription = s; + + actual.onSubscribe(s); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (t == null) { + subscription.cancel(); + onError(new NullPointerException()); + return; + } + synchronized (this) { + if (done) { + return; + } + if (emitting) { + AppendOnlyLinkedArrayList q = queue; + if (q == null) { + q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE); + queue = q; + } + q.add(NotificationLite.next(t)); + return; + } + emitting = true; + } + + actual.onNext(t); + + emitLoop(); + } + + @Override + public void onError(Throwable t) { + if (done) { + return; + } + boolean reportError; + synchronized (this) { + if (done) { + reportError = true; + } else + if (emitting) { + done = true; + AppendOnlyLinkedArrayList q = queue; + if (q == null) { + q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE); + queue = q; + } + Object err = NotificationLite.error(t); + if (delayError) { + q.add(err); + } else { + q.setFirst(err); + } + return; + } else { + done = true; + emitting = true; + reportError = false; + } + } + + if (reportError) { + return; + } + + actual.onError(t); + // no need to loop because this onError is the last event + } + + @Override + public void onComplete() { + if (done) { + return; + } + synchronized (this) { + if (done) { + return; + } + if (emitting) { + AppendOnlyLinkedArrayList q = queue; + if (q == null) { + q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE); + queue = q; + } + q.add(NotificationLite.complete()); + return; + } + done = true; + emitting = true; + } + + actual.onComplete(); + // no need to loop because this onComplete is the last event + } + + void emitLoop() { + for (;;) { + AppendOnlyLinkedArrayList q; + synchronized (this) { + q = queue; + if (q == null) { + emitting = false; + return; + } + queue = null; + } + + q.forEachWhile(this::accept); + } + } + + boolean accept(Object value) { + return NotificationLite.accept(value, actual); + } +} diff --git a/src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java b/src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java new file mode 100644 index 000000000..348551e68 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SpscArrayQueue.java @@ -0,0 +1,133 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. + *

+ * This implementation is a mashup of the Fast Flow + * algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast + * Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
+ * For convenience the relevant papers are available in the resources folder:
+ * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
+ * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
+ *
This implementation is wait free. + * + * @param + */ +public final class SpscArrayQueue extends BaseArrayQueue { + /** */ + private static final long serialVersionUID = -1296597691183856449L; + private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); + final AtomicLong producerIndex; + protected long producerLookAhead; + final AtomicLong consumerIndex; + final int lookAheadStep; + public SpscArrayQueue(int capacity) { + super(capacity); + this.producerIndex = new AtomicLong(); + this.consumerIndex = new AtomicLong(); + lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); + } + + @Override + public boolean offer(E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + // local load of field to avoid repeated loads after volatile reads + final int mask = this.mask; + final long index = producerIndex.get(); + final int offset = calcElementOffset(index, mask); + if (index >= producerLookAhead) { + int step = lookAheadStep; + if (null == lvElement(calcElementOffset(index + step, mask))) {// LoadLoad + producerLookAhead = index + step; + } + else if (null != lvElement(offset)){ + return false; + } + } + soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(offset, e); // StoreStore + return true; + } + + @Override + public E poll() { + final long index = consumerIndex.get(); + final int offset = calcElementOffset(index); + // local load of field to avoid repeated loads after volatile reads + final E e = lvElement(offset);// LoadLoad + if (null == e) { + return null; + } + soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(offset, null);// StoreStore + return e; + } + + @Override + public E peek() { + return lvElement(calcElementOffset(consumerIndex.get())); + } + + @Override + public boolean isEmpty() { + return producerIndex.get() == consumerIndex.get(); + } + + @Override + public int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer + * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent + * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + private void soProducerIndex(long newIndex) { + producerIndex.lazySet(newIndex); + } + + private void soConsumerIndex(long newIndex) { + consumerIndex.lazySet(newIndex); + } + + private long lvConsumerIndex() { + return consumerIndex.get(); + } + private long lvProducerIndex() { + return producerIndex.get(); + } + +} + diff --git a/src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java b/src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java new file mode 100644 index 000000000..41b3664b3 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SpscExactArrayQueue.java @@ -0,0 +1,164 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +/* + * The code was inspired by the similarly named JCTools class: + * https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic + */ + +package io.reactivesocket.internal.rx; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * A single-producer single-consumer array-backed queue with exact, non power-of-2 logical capacity. + */ +public final class SpscExactArrayQueue extends AtomicReferenceArray implements Queue { + /** */ + private static final long serialVersionUID = 6210984603741293445L; + final int mask; + final int capacitySkip; + volatile long producerIndex; + volatile long consumerIndex; + + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater PRODUCER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscExactArrayQueue.class, "producerIndex"); + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater CONSUMER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscExactArrayQueue.class, "consumerIndex"); + + public SpscExactArrayQueue(int capacity) { + super(Pow2.roundToPowerOfTwo(capacity)); + int len = length(); + this.mask = len - 1; + this.capacitySkip = len - capacity; + } + + + @Override + public boolean offer(T value) { + Objects.requireNonNull(value); + + long pi = producerIndex; + int m = mask; + + int fullCheck = (int)(pi + capacitySkip) & m; + if (get(fullCheck) != null) { + return false; + } + int offset = (int)pi & m; + PRODUCER_INDEX.lazySet(this, pi + 1); + lazySet(offset, value); + return true; + } + @Override + public T poll() { + long ci = consumerIndex; + int offset = (int)ci & mask; + T value = get(offset); + if (value == null) { + return null; + } + CONSUMER_INDEX.lazySet(this, ci + 1); + lazySet(offset, null); + return value; + } + @Override + public T peek() { + return get((int)consumerIndex & mask); + } + @Override + public void clear() { + while (poll() != null || !isEmpty()); + } + @Override + public boolean isEmpty() { + return producerIndex == consumerIndex; + } + + @Override + public int size() { + long ci = consumerIndex; + for (;;) { + long pi = producerIndex; + long ci2 = consumerIndex; + if (ci == ci2) { + return (int)(pi - ci2); + } + ci = ci2; + } + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public E[] toArray(E[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T e) { + throw new UnsupportedOperationException(); + } + + @Override + public T remove() { + throw new UnsupportedOperationException(); + } + + @Override + public T element() { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java b/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java new file mode 100644 index 000000000..233ab3472 --- /dev/null +++ b/src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java @@ -0,0 +1,188 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal.rx; +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +import java.util.*; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscription; + +/** + * Arbitrates requests and cancellation between Subscriptions. + */ +public final class SubscriptionArbiter extends AtomicInteger implements Subscription { + /** */ + private static final long serialVersionUID = -2189523197179400958L; + + final Queue missedSubscription = new MpscLinkedQueue<>(); + + Subscription actual; + long requested; + + volatile boolean cancelled; + + volatile long missedRequested; + static final AtomicLongFieldUpdater MISSED_REQUESTED = + AtomicLongFieldUpdater.newUpdater(SubscriptionArbiter.class, "missedRequested"); + + volatile long missedProduced; + static final AtomicLongFieldUpdater MISSED_PRODUCED = + AtomicLongFieldUpdater.newUpdater(SubscriptionArbiter.class, "missedProduced"); + + private long addRequested(long n) { + long r = requested; + long u = BackpressureHelper.addCap(r, n); + requested = u; + return r; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validateRequest(n)) { + return; + } + if (cancelled) { + return; + } + QueueDrainHelper.queueDrainLoop(this, () -> { + addRequested(n); + Subscription s = actual; + if (s != null) { + s.request(n); + } + }, () -> { + BackpressureHelper.add(MISSED_REQUESTED, this, n); + }, this::drain); + } + + public void produced(long n) { + if (n <= 0) { + return; + } + QueueDrainHelper.queueDrainLoop(this, () -> { + long r = requested; + if (r == Long.MAX_VALUE) { + return; + } + long u = r - n; + if (u < 0L) { + u = 0; + } + requested = u; + }, () -> { + BackpressureHelper.add(MISSED_PRODUCED, this, n); + }, this::drain); + } + + public void setSubscription(Subscription s) { + Objects.requireNonNull(s); + if (cancelled) { + s.cancel(); + return; + } + QueueDrainHelper.queueDrainLoop(this, () -> { + Subscription a = actual; + if (a != null) { + a.cancel(); + } + actual = s; + long r = requested; + if (r != 0L) { + s.request(r); + } + }, () -> { + missedSubscription.offer(s); + }, this::drain); + } + + @Override + public void cancel() { + if (cancelled) { + return; + } + cancelled = true; + QueueDrainHelper.queueDrainLoop(this, () -> { + Subscription a = actual; + if (a != null) { + actual = null; + a.cancel(); + } + }, () -> { + // nothing to queue + }, this::drain); + } + + public boolean isCancelled() { + return cancelled; + } + + void drain() { + long mr = MISSED_REQUESTED.getAndSet(this, 0L); + long mp = MISSED_PRODUCED.getAndSet(this, 0L); + Subscription ms = missedSubscription.poll(); + boolean c = cancelled; + + long r = requested; + if (r != Long.MAX_VALUE && !c) { + long u = r + mr; + if (u < 0L) { + r = Long.MAX_VALUE; + requested = Long.MAX_VALUE; + } else { + long v = u - mp; + if (v < 0L) { + v = 0L; + } + r = v; + requested = v; + } + } + + Subscription a = actual; + if (c && a != null) { + actual = null; + a.cancel(); + } + + if (ms == null) { + if (a != null && mr != 0L) { + a.request(mr); + } + } else { + if (c) { + ms.cancel(); + } else { + if (a != null) { + a.cancel(); + } + actual = ms; + if (r != 0L) { + ms.request(r); + } + } + } + } +} diff --git a/src/main/java/io/reactivesocket/internal/SubscriptionHelper.java b/src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java similarity index 98% rename from src/main/java/io/reactivesocket/internal/SubscriptionHelper.java rename to src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java index d0f6970ff..ad72a8d57 100644 --- a/src/main/java/io/reactivesocket/internal/SubscriptionHelper.java +++ b/src/main/java/io/reactivesocket/internal/rx/SubscriptionHelper.java @@ -11,7 +11,7 @@ * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivesocket.internal; +package io.reactivesocket.internal.rx; import org.reactivestreams.*; diff --git a/src/main/java/io/reactivesocket/FairLeaseGovernor.java b/src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java similarity index 95% rename from src/main/java/io/reactivesocket/FairLeaseGovernor.java rename to src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java index b45176150..437a81998 100644 --- a/src/main/java/io/reactivesocket/FairLeaseGovernor.java +++ b/src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java @@ -1,5 +1,7 @@ -package io.reactivesocket; +package io.reactivesocket.lease; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; import io.reactivesocket.internal.Responder; import java.util.HashMap; diff --git a/src/main/java/io/reactivesocket/NullLeaseGovernor.java b/src/main/java/io/reactivesocket/lease/NullLeaseGovernor.java similarity index 76% rename from src/main/java/io/reactivesocket/NullLeaseGovernor.java rename to src/main/java/io/reactivesocket/lease/NullLeaseGovernor.java index 4fd85f8dd..a08fc1bac 100644 --- a/src/main/java/io/reactivesocket/NullLeaseGovernor.java +++ b/src/main/java/io/reactivesocket/lease/NullLeaseGovernor.java @@ -1,5 +1,7 @@ -package io.reactivesocket; +package io.reactivesocket.lease; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; import io.reactivesocket.internal.Responder; public class NullLeaseGovernor implements LeaseGovernor { diff --git a/src/main/java/io/reactivesocket/UnlimitedLeaseGovernor.java b/src/main/java/io/reactivesocket/lease/UnlimitedLeaseGovernor.java similarity index 79% rename from src/main/java/io/reactivesocket/UnlimitedLeaseGovernor.java rename to src/main/java/io/reactivesocket/lease/UnlimitedLeaseGovernor.java index d080f0801..3cff13ff6 100644 --- a/src/main/java/io/reactivesocket/UnlimitedLeaseGovernor.java +++ b/src/main/java/io/reactivesocket/lease/UnlimitedLeaseGovernor.java @@ -1,5 +1,7 @@ -package io.reactivesocket; +package io.reactivesocket.lease; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; import io.reactivesocket.internal.Responder; public class UnlimitedLeaseGovernor implements LeaseGovernor { diff --git a/src/main/java/io/reactivesocket/Completable.java b/src/main/java/io/reactivesocket/rx/Completable.java similarity index 79% rename from src/main/java/io/reactivesocket/Completable.java rename to src/main/java/io/reactivesocket/rx/Completable.java index dc1cd4c75..9f87f6a11 100644 --- a/src/main/java/io/reactivesocket/Completable.java +++ b/src/main/java/io/reactivesocket/rx/Completable.java @@ -1,4 +1,4 @@ -package io.reactivesocket; +package io.reactivesocket.rx; public interface Completable { diff --git a/src/main/java/io/reactivesocket/observable/Disposable.java b/src/main/java/io/reactivesocket/rx/Disposable.java similarity index 61% rename from src/main/java/io/reactivesocket/observable/Disposable.java rename to src/main/java/io/reactivesocket/rx/Disposable.java index 5d011fefd..df6efcda7 100644 --- a/src/main/java/io/reactivesocket/observable/Disposable.java +++ b/src/main/java/io/reactivesocket/rx/Disposable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.observable; +package io.reactivesocket.rx; public interface Disposable { diff --git a/src/main/java/io/reactivesocket/observable/Observable.java b/src/main/java/io/reactivesocket/rx/Observable.java similarity index 66% rename from src/main/java/io/reactivesocket/observable/Observable.java rename to src/main/java/io/reactivesocket/rx/Observable.java index 8fe5292ee..9c5d6e39d 100644 --- a/src/main/java/io/reactivesocket/observable/Observable.java +++ b/src/main/java/io/reactivesocket/rx/Observable.java @@ -1,4 +1,4 @@ -package io.reactivesocket.observable; +package io.reactivesocket.rx; public interface Observable { diff --git a/src/main/java/io/reactivesocket/observable/Observer.java b/src/main/java/io/reactivesocket/rx/Observer.java similarity index 81% rename from src/main/java/io/reactivesocket/observable/Observer.java rename to src/main/java/io/reactivesocket/rx/Observer.java index d58a5895c..5a8bafde7 100644 --- a/src/main/java/io/reactivesocket/observable/Observer.java +++ b/src/main/java/io/reactivesocket/rx/Observer.java @@ -1,4 +1,4 @@ -package io.reactivesocket.observable; +package io.reactivesocket.rx; public interface Observer { diff --git a/src/main/java/io/reactivesocket/observable/README.md b/src/main/java/io/reactivesocket/rx/README.md similarity index 61% rename from src/main/java/io/reactivesocket/observable/README.md rename to src/main/java/io/reactivesocket/rx/README.md index 8c190d5b6..e75d96494 100644 --- a/src/main/java/io/reactivesocket/observable/README.md +++ b/src/main/java/io/reactivesocket/rx/README.md @@ -1,3 +1,3 @@ Interfaces for `Observable` that does not support backpressure. -TODO: Decide if we just use concrete types from RxJava 2 once this type exists. (Flowable vs Observable) \ No newline at end of file +TODO: Decide if we just use concrete types from RxJava 2 once this type exists. (Flowable vs Observable) (BenC would prefer this package go away) \ No newline at end of file diff --git a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java index 7e0762d3c..60291f22a 100644 --- a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java +++ b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java @@ -18,6 +18,7 @@ import io.reactivesocket.internal.PublisherUtils; import io.reactivesocket.perfutil.PerfTestConnection; +import io.reactivesocket.rx.Completable; @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java index ca1c13726..2cf44038a 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java @@ -21,10 +21,10 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.Completable; import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; -import io.reactivesocket.observable.Observable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; public class PerfTestConnection implements DuplexConnection { diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java b/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java index 4c768cc7b..32652f949 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfUnicastSubjectNoBackpressure.java @@ -17,9 +17,9 @@ import java.util.function.Consumer; -import io.reactivesocket.observable.Disposable; -import io.reactivesocket.observable.Observable; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; /** * The difference between this and the real UnicastSubject is in the `onSubscribe` method where it calls requestN. Not sure that behavior should exist in the producton code. diff --git a/src/test/java/io/reactivesocket/FrameTest.java b/src/test/java/io/reactivesocket/FrameTest.java index 3a6b66631..9cd28eac3 100644 --- a/src/test/java/io/reactivesocket/FrameTest.java +++ b/src/test/java/io/reactivesocket/FrameTest.java @@ -21,7 +21,8 @@ import java.util.concurrent.TimeUnit; import io.reactivesocket.exceptions.RejectedException; -import io.reactivesocket.internal.SetupFrameFlyweight; +import io.reactivesocket.internal.frame.SetupFrameFlyweight; + import org.junit.Test; import org.junit.experimental.theories.DataPoint; import org.junit.experimental.theories.Theories; @@ -29,7 +30,7 @@ import org.junit.runner.RunWith; import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; -import static io.reactivesocket.internal.ErrorFrameFlyweight.*; +import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*; import static java.nio.charset.StandardCharsets.UTF_8; @RunWith(Theories.class) diff --git a/src/test/java/io/reactivesocket/LatchedCompletable.java b/src/test/java/io/reactivesocket/LatchedCompletable.java index 159362b42..e70df1df4 100644 --- a/src/test/java/io/reactivesocket/LatchedCompletable.java +++ b/src/test/java/io/reactivesocket/LatchedCompletable.java @@ -18,6 +18,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import io.reactivesocket.rx.Completable; + public class LatchedCompletable implements Completable { final CountDownLatch latch; diff --git a/src/test/java/io/reactivesocket/ReactiveSocketTest.java b/src/test/java/io/reactivesocket/ReactiveSocketTest.java index f1b0981f1..5dbd3241b 100644 --- a/src/test/java/io/reactivesocket/ReactiveSocketTest.java +++ b/src/test/java/io/reactivesocket/ReactiveSocketTest.java @@ -35,6 +35,7 @@ import org.reactivestreams.Publisher; import io.reactivesocket.internal.PublisherUtils; +import io.reactivesocket.lease.FairLeaseGovernor; import io.reactivex.disposables.Disposable; import io.reactivex.observables.ConnectableObservable; import io.reactivex.subscribers.TestSubscriber; diff --git a/src/test/java/io/reactivesocket/SerializedEventBus.java b/src/test/java/io/reactivesocket/SerializedEventBus.java index 031b0a363..01018b9ee 100644 --- a/src/test/java/io/reactivesocket/SerializedEventBus.java +++ b/src/test/java/io/reactivesocket/SerializedEventBus.java @@ -18,7 +18,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Observer; import io.reactivex.subjects.PublishSubject; import io.reactivex.subjects.Subject; @@ -66,7 +66,7 @@ public void onComplete() { } @Override - public void onSubscribe(io.reactivesocket.observable.Disposable d) { + public void onSubscribe(io.reactivesocket.rx.Disposable d) { // TODO Auto-generated method stub } diff --git a/src/test/java/io/reactivesocket/TestConnection.java b/src/test/java/io/reactivesocket/TestConnection.java index 2b28c8445..61e1ffc43 100644 --- a/src/test/java/io/reactivesocket/TestConnection.java +++ b/src/test/java/io/reactivesocket/TestConnection.java @@ -21,7 +21,8 @@ import org.reactivestreams.Publisher; -import io.reactivesocket.observable.Observer; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observer; import io.reactivex.Observable; import io.reactivex.Scheduler.Worker; import io.reactivex.schedulers.Schedulers; @@ -42,14 +43,14 @@ public void addOutput(Publisher o, Completable callback) { } @Override - public io.reactivesocket.observable.Observable getInput() { - return new io.reactivesocket.observable.Observable() { + public io.reactivesocket.rx.Observable getInput() { + return new io.reactivesocket.rx.Observable() { @Override public void subscribe(Observer o) { toInput.add(o); // we are okay with the race of sending data and cancelling ... since this is "hot" by definition and unsubscribing is a race. - o.onSubscribe(new io.reactivesocket.observable.Disposable() { + o.onSubscribe(new io.reactivesocket.rx.Disposable() { @Override public void dispose() { diff --git a/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java b/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java index f512310f8..e70368a36 100644 --- a/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java +++ b/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java @@ -24,6 +24,8 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import io.reactivesocket.rx.Completable; + /** * Connection that by defaults only calls request(1) on a Publisher to addOutput. Any further must be done via requestMore(n) *

diff --git a/src/test/java/io/reactivesocket/TestTransportRequestN.java b/src/test/java/io/reactivesocket/TestTransportRequestN.java index cb9ce1c85..bb3d0b785 100644 --- a/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.reactivestreams.Publisher; +import io.reactivesocket.lease.FairLeaseGovernor; import io.reactivex.subscribers.TestSubscriber; /** diff --git a/src/test/java/io/reactivesocket/internal/FragmenterTest.java b/src/test/java/io/reactivesocket/internal/FragmenterTest.java index ba6ecd598..686394d08 100644 --- a/src/test/java/io/reactivesocket/internal/FragmenterTest.java +++ b/src/test/java/io/reactivesocket/internal/FragmenterTest.java @@ -18,6 +18,9 @@ import io.reactivesocket.Frame; import io.reactivesocket.Payload; import io.reactivesocket.TestUtil; +import io.reactivesocket.internal.frame.FrameHeaderFlyweight; +import io.reactivesocket.internal.frame.PayloadFragmenter; + import org.junit.Test; import static org.junit.Assert.*; diff --git a/src/test/java/io/reactivesocket/internal/ReassemblerTest.java b/src/test/java/io/reactivesocket/internal/ReassemblerTest.java index dca8a3859..0b134d697 100644 --- a/src/test/java/io/reactivesocket/internal/ReassemblerTest.java +++ b/src/test/java/io/reactivesocket/internal/ReassemblerTest.java @@ -19,6 +19,8 @@ import io.reactivesocket.FrameType; import io.reactivesocket.Payload; import io.reactivesocket.TestUtil; +import io.reactivesocket.internal.frame.FrameHeaderFlyweight; +import io.reactivesocket.internal.frame.PayloadReassembler; import io.reactivex.subjects.ReplaySubject; import org.junit.Test; diff --git a/src/test/java/io/reactivesocket/internal/RequesterTest.java b/src/test/java/io/reactivesocket/internal/RequesterTest.java index b9858003f..56a680218 100644 --- a/src/test/java/io/reactivesocket/internal/RequesterTest.java +++ b/src/test/java/io/reactivesocket/internal/RequesterTest.java @@ -28,13 +28,13 @@ import org.junit.Test; -import io.reactivesocket.Completable; import io.reactivesocket.ConnectionSetupPayload; import io.reactivesocket.Frame; import io.reactivesocket.FrameType; import io.reactivesocket.LatchedCompletable; import io.reactivesocket.Payload; import io.reactivesocket.TestConnection; +import io.reactivesocket.rx.Completable; import io.reactivex.subscribers.TestSubscriber; import io.reactivex.Observable; import io.reactivex.subjects.ReplaySubject;