diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java index ee43b4cb7..e242868fa 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java @@ -74,6 +74,10 @@ final class RequestChannelRequesterFlux extends Flux static final AtomicLongFieldUpdater STATE = AtomicLongFieldUpdater.newUpdater(RequestChannelRequesterFlux.class, "state"); + volatile long requestN; + static final AtomicLongFieldUpdater REQUEST_N = + AtomicLongFieldUpdater.newUpdater(RequestChannelRequesterFlux.class, "requestN"); + int streamId; boolean isFirstSignal = true; @@ -141,18 +145,22 @@ public final void request(long n) { } this.requested = Operators.addCap(this.requested, n); + Operators.addCap(REQUEST_N, this, n); - long previousState = markRequestAdded(STATE, this, n, this.requesterLeaseTracker == null); + long previousState = markRequestAdded(STATE, this, this.requesterLeaseTracker == null); if (isTerminated(previousState)) { return; } if (hasRequested(previousState)) { - if (isFirstFrameSent(previousState) - && !isMaxAllowedRequestN(extractRequestN(previousState))) { - final int streamId = this.streamId; - final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n); - this.connection.sendFrame(streamId, requestNFrame); + if (isFirstFrameSent(previousState)) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final int streamId = this.streamId; + final ByteBuf requestNFrame = + RequestNFrameCodec.encode(this.allocator, streamId, requestN); + this.connection.sendFrame(streamId, requestNFrame); + } } return; } @@ -192,7 +200,7 @@ public void onNext(Payload p) { return; } // TODO: check if source is Scalar | Callable | Mono - sendFirstPayload(p, extractRequestN(state), false); + sendFirstPayload(p, false); } } else { sendFollowingPayload(p); @@ -210,12 +218,11 @@ public boolean handlePermit() { final Payload firstPayload = this.firstPayload; this.firstPayload = null; - sendFirstPayload( - firstPayload, extractRequestN(previousState), isOutboundTerminated(previousState)); + sendFirstPayload(firstPayload, isOutboundTerminated(previousState)); return true; } - void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean completed) { + void sendFirstPayload(Payload firstPayload, boolean completed) { int mtu = this.mtu; try { if (!isValid(mtu, this.maxFrameLength, firstPayload, true)) { @@ -304,6 +311,8 @@ void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean comple requestInterceptor.onStart(streamId, FrameType.REQUEST_CHANNEL, firstPayload.metadata()); } + long initialRequestN = extractRequestN(REQUEST_N, this); + try { sendReleasingPayload( streamId, @@ -387,21 +396,17 @@ void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean comple connection.sendFrame(streamId, completeFrame); } - if (isMaxAllowedRequestN(initialRequestN)) { - return; - } - - long requestN = extractRequestN(previousState); - if (isMaxAllowedRequestN(requestN)) { - final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN); - connection.sendFrame(streamId, requestNFrame); + if (initialRequestN > Integer.MAX_VALUE) { return; } - if (requestN > initialRequestN) { - final ByteBuf requestNFrame = - RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN); - connection.sendFrame(streamId, requestNFrame); + long requestedTimes = requestedTimes(previousState); + if (requestedTimes > 1) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN); + connection.sendFrame(streamId, requestNFrame); + } } } @@ -731,6 +736,16 @@ public final void handlePayload(Payload value) { this.produced = produced + 1; + if (this.produced % LIMIT == 0) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final int streamId = this.streamId; + final ByteBuf requestNFrame = + RequestNFrameCodec.encode(this.allocator, streamId, requestN); + this.connection.sendFrame(streamId, requestNFrame); + } + } + this.inboundSubscriber.onNext(value); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java index 3ff875641..044101e22 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java @@ -73,6 +73,10 @@ final class RequestChannelResponderSubscriber extends Flux static final AtomicLongFieldUpdater STATE = AtomicLongFieldUpdater.newUpdater(RequestChannelResponderSubscriber.class, "state"); + volatile long requestN; + static final AtomicLongFieldUpdater REQUEST_N = + AtomicLongFieldUpdater.newUpdater(RequestChannelResponderSubscriber.class, "requestN"); + Payload firstPayload; Subscription outboundSubscription; @@ -182,8 +186,9 @@ public void request(long n) { } this.requested = Operators.addCap(this.requested, n); + Operators.addCap(REQUEST_N, this, n); - long previousState = StateUtils.markRequestAdded(STATE, this, n); + long previousState = StateUtils.markRequestAdded(STATE, this); if (isTerminated(previousState)) { // full termination can be the result of both sides completion / cancelFrame / remote or local // error @@ -234,11 +239,14 @@ public void request(long n) { } if (hasRequested(previousState)) { - if (isFirstFrameSent(previousState) - && !isMaxAllowedRequestN(StateUtils.extractRequestN(previousState))) { - final int streamId = this.streamId; - final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n); - this.connection.sendFrame(streamId, requestNFrame); + if (isFirstFrameSent(previousState)) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final int streamId = this.streamId; + final ByteBuf requestNFrame = + RequestNFrameCodec.encode(this.allocator, streamId, requestN); + this.connection.sendFrame(streamId, requestNFrame); + } } return; } @@ -279,13 +287,13 @@ public void request(long n) { return; } - long requestN = StateUtils.extractRequestN(previousState); - if (isMaxAllowedRequestN(requestN)) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN == Long.MAX_VALUE) { final int streamId = this.streamId; - final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN); + final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, requestN); this.connection.sendFrame(streamId, requestNFrame); } else { - long firstRequestN = requestN - 1; + long firstRequestN = Math.min(Integer.MAX_VALUE, requestN) - 1; if (firstRequestN > 0) { final int streamId = this.streamId; final ByteBuf requestNFrame = @@ -480,6 +488,16 @@ final void handlePayload(Payload p) { this.produced = produced + 1; + if (this.produced % LIMIT == 0) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final int streamId = this.streamId; + final ByteBuf requestNFrame = + RequestNFrameCodec.encode(this.allocator, streamId, requestN); + this.connection.sendFrame(streamId, requestNFrame); + } + } + this.inboundSubscriber.onNext(p); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestResponseRequesterMono.java b/rsocket-core/src/main/java/io/rsocket/core/RequestResponseRequesterMono.java index 86ef3be79..f9dc93e19 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestResponseRequesterMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestResponseRequesterMono.java @@ -138,7 +138,7 @@ public final void request(long n) { final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker; final boolean leaseEnabled = requesterLeaseTracker != null; - final long previousState = markRequestAdded(STATE, this, n, !leaseEnabled); + final long previousState = markRequestAdded(STATE, this, !leaseEnabled); if (isTerminated(previousState) || hasRequested(previousState)) { return; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java b/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java index d351afcff..4924e13ae 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java @@ -61,15 +61,15 @@ final class RequestStreamRequesterFlux extends Flux static final AtomicLongFieldUpdater STATE = AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "state"); - volatile long requested; - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "requested"); - + volatile long requestN; + static final AtomicLongFieldUpdater REQUEST_N = + AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "requestN"); int streamId; CoreSubscriber inboundSubscriber; CompositeByteBuf frames; boolean done; + long requested; long produced; RequestStreamRequesterFlux(Payload payload, RequesterResponderSupport requesterResponderSupport) { @@ -140,9 +140,8 @@ public final void request(long n) { return; } - if (Operators.addCap(REQUESTED, this, n) > 0) { - return; - } + this.requested = Operators.addCap(this.requested, n); + Operators.addCap(REQUEST_N, this, n); final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker; final boolean leaseEnabled = requesterLeaseTracker != null; @@ -153,9 +152,13 @@ public final void request(long n) { if (hasRequested(previousState)) { if (isFirstFrameSent(previousState)) { - final int streamId = this.streamId; - final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n); - this.connection.sendFrame(streamId, requestNFrame); + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final int streamId = this.streamId; + final ByteBuf requestNFrame = + RequestNFrameCodec.encode(this.allocator, streamId, requestN); + this.connection.sendFrame(streamId, requestNFrame); + } } return; } @@ -165,7 +168,7 @@ public final void request(long n) { return; } - sendFirstPayload(this.payload, n); + sendFirstPayload(this.payload); } @Override @@ -176,11 +179,11 @@ public boolean handlePermit() { return false; } - sendFirstPayload(this.payload, this.requested); + sendFirstPayload(this.payload); return true; } - void sendFirstPayload(Payload payload, long initialRequestN) { + void sendFirstPayload(Payload payload) { final RequesterResponderSupport sm = this.requesterResponderSupport; final DuplexConnection connection = this.connection; @@ -213,10 +216,9 @@ void sendFirstPayload(Payload payload, long initialRequestN) { requestInterceptor.onStart(streamId, FrameType.REQUEST_STREAM, payload.metadata()); } + long initialRequestN = extractRequestN(REQUEST_N, this); + try { - if (initialRequestN != Long.MAX_VALUE) { - REQUESTED.addAndGet() - } sendReleasingPayload( streamId, FrameType.REQUEST_STREAM, @@ -257,21 +259,17 @@ void sendFirstPayload(Payload payload, long initialRequestN) { return; } - if (isMaxAllowedRequestN(initialRequestN)) { + if (initialRequestN > Integer.MAX_VALUE) { return; } - long requestN = extractRequestN(previousState); - if (isMaxAllowedRequestN(requestN)) { - final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN); - connection.sendFrame(streamId, requestNFrame); - return; - } - - if (requestN > initialRequestN) { - final ByteBuf requestNFrame = - RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN); - connection.sendFrame(streamId, requestNFrame); + long requestedTimes = requestedTimes(previousState); + if (requestedTimes > 1) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN); + connection.sendFrame(streamId, requestNFrame); + } } } @@ -309,6 +307,7 @@ public final void handlePayload(Payload p) { } final long produced = this.produced; + // check overflow if (this.requested == produced) { p.release(); @@ -337,6 +336,15 @@ public final void handlePayload(Payload p) { this.produced = produced + 1; + if (this.produced % LIMIT == 0) { + long requestN = extractRequestN(REQUEST_N, this); + if (requestN > 0) { + final int streamId = this.streamId; + final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, requestN); + this.connection.sendFrame(streamId, requestNFrame); + } + } + this.inboundSubscriber.onNext(p); } @@ -444,7 +452,7 @@ public Object scanUnsafe(Attr key) { long state = this.state; if (key == Attr.TERMINATED) return isTerminated(state); - if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return extractRequestN(state); + if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return this.requested; return null; } diff --git a/rsocket-core/src/main/java/io/rsocket/core/StateUtils.java b/rsocket-core/src/main/java/io/rsocket/core/StateUtils.java index ec486d1c7..9d1c8ab54 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/StateUtils.java +++ b/rsocket-core/src/main/java/io/rsocket/core/StateUtils.java @@ -476,4 +476,29 @@ static long incrementRequestField(long a) { static boolean hasRequested(long state) { return (state & REQUEST_MASK) > 0; } + + static int requestedTimes(long state) { + return (int) (state & REQUEST_MASK); + } + + static long extractRequestN(AtomicLongFieldUpdater updater, T instance) { + for (; ; ) { + long requestN = updater.get(instance); + + if (requestN == 0) { + return 0; + } + + if (requestN == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + + long rsocketRequestN = Math.min(requestN, Integer.MAX_VALUE); + if (updater.compareAndSet(instance, requestN, (requestN - rsocketRequestN))) { + return requestN; + } + } + } + + static final long LIMIT = (long) Integer.MAX_VALUE << 2; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameCodec.java index 66bdd46f4..c37aba2ea 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameCodec.java @@ -25,6 +25,6 @@ public static long requestN(ByteBuf byteBuf) { byteBuf.skipBytes(FrameHeaderCodec.size()); int i = byteBuf.readInt(); byteBuf.resetReaderIndex(); - return i == Integer.MAX_VALUE ? Long.MAX_VALUE : i; + return i; } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java index c1e0a6876..2bdf12288 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java @@ -95,7 +95,8 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp Assertions.assertThat(payload.refCnt()).isOne(); activeStreams.assertNoActiveStreams(); - stateAssert.hasSubscribedFlag().hasRequestN(10).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(1) + .hasRequestN(10).hasNoFirstFrameSentFlag(); publisher.assertMaxRequested(1).next(payload); @@ -104,7 +105,9 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp activeStreams.assertHasStream(1, requestChannelRequesterFlux); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(10).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestN(10) + .hasRequestedTimes(1).hasFirstFrameSentFlag(); final ByteBuf frame = sender.awaitFrame(); FrameAssert.assertThat(frame) @@ -137,7 +140,9 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check. Request N Frame should sent so request field should be 0 // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(11).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(2) + .hasRequestN(11).hasFirstFrameSentFlag(); assertSubscriber.request(Long.MAX_VALUE); final ByteBuf requestMaxNFrame = sender.awaitFrame(); @@ -152,13 +157,26 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp Assertions.assertThat(sender.isEmpty()).isTrue(); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(3) + .hasRequestN(Long.MAX_VALUE).hasFirstFrameSentFlag(); assertSubscriber.request(6); - Assertions.assertThat(sender.isEmpty()).isTrue(); + + final ByteBuf moreRequestMaxNFrame = sender.awaitFrame(); + FrameAssert.assertThat(moreRequestMaxNFrame) + .isNotNull() + .hasRequestN(Integer.MAX_VALUE) + .typeOf(FrameType.REQUEST_N) + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(4) + .hasRequestN(Long.MAX_VALUE) + .hasFirstFrameSentFlag(); Payload nextPayload = TestRequesterResponderSupport.genericPayload(allocator); requestChannelRequesterFlux.handlePayload(nextPayload); @@ -174,9 +192,9 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert - .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) - .hasFirstFrameSentFlag() + .hasSubscribedFlag() + .hasRequestedTimes(4) + .hasRequestN(Long.MAX_VALUE) .hasReassemblingFlag(); for (int i = 0; i < fragments.size(); i++) { @@ -189,9 +207,9 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert - .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) - .hasFirstFrameSentFlag() + .hasSubscribedFlag() + .hasRequestedTimes(4) + .hasRequestN(Long.MAX_VALUE) .hasNoReassemblingFlag(); if (completionCase.equals("inbound")) { @@ -208,7 +226,8 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(4) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag() .hasInboundTerminated(); @@ -222,7 +241,8 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(4) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag() .hasOutboundTerminated(); @@ -272,7 +292,10 @@ public void streamShouldErrorWithoutInitializingRemoteStreamIfSourceIsEmpty(bool if (doRequest) { assertSubscriber.request(Integer.MAX_VALUE); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(1) + .hasRequestN(Integer.MAX_VALUE) + .hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); } @@ -317,7 +340,10 @@ public void streamShouldPropagateErrorWithoutInitializingRemoteStreamIfTheFirstS if (doRequest) { assertSubscriber.request(Integer.MAX_VALUE); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(1) + .hasRequestN(Integer.MAX_VALUE) + .hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); } @@ -360,7 +386,10 @@ public void streamShouldBeInHalfClosedStateOnTheInboundCancellation(String termi stateAssert.hasSubscribedFlagOnly(); assertSubscriber.request(Integer.MAX_VALUE); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(1) + .hasRequestN(Integer.MAX_VALUE) + .hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); Payload payload1 = TestRequesterResponderSupport.randomPayload(allocator); @@ -376,7 +405,11 @@ public void streamShouldBeInHalfClosedStateOnTheInboundCancellation(String termi .hasNoLeaks(); payload1.release(); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert + .hasSubscribedFlag() + .hasRequestedTimes(1) + .hasRequestN(Integer.MAX_VALUE) + .hasFirstFrameSentFlag(); activeStreams.assertHasStream(1, requestChannelRequesterFlux); publisher.assertMaxRequested(1); @@ -458,7 +491,10 @@ public void errorShouldTerminateExecution(String terminationMode) { stateAssert.hasSubscribedFlagOnly(); assertSubscriber.request(Integer.MAX_VALUE); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(1) + .hasRequestN(Integer.MAX_VALUE) + .hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); Payload payload1 = TestRequesterResponderSupport.randomPayload(allocator); @@ -474,7 +510,7 @@ public void errorShouldTerminateExecution(String terminationMode) { .hasNoLeaks(); payload1.release(); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasRequestedTimes(1).hasFirstFrameSentFlag(); activeStreams.assertHasStream(1, requestChannelRequesterFlux); publisher.assertMaxRequested(1); @@ -540,7 +576,7 @@ public void failOnOverflow() { stateAssert.hasSubscribedFlagOnly(); assertSubscriber.request(1); - stateAssert.hasSubscribedFlag().hasRequestN(1).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasRequestN(1).hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); Payload payload1 = TestRequesterResponderSupport.randomPayload(allocator); @@ -554,7 +590,10 @@ public void failOnOverflow() { .hasNoLeaks(); payload1.release(); - stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestedTimes(1) + .hasRequestN(1) + .hasFirstFrameSentFlag(); activeStreams.assertHasStream(1, requestChannelRequesterFlux); publisher.assertMaxRequested(1); @@ -642,13 +681,17 @@ public void shouldHaveEventsDeliveredSeriallyWhenOutboundErrorRacingWithInboundS stateAssert.hasSubscribedFlagOnly(); assertSubscriber.request(Integer.MAX_VALUE); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(1) + .hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); Payload requestPayload = TestRequesterResponderSupport.randomPayload(allocator); publisher.next(requestPayload); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag() + .hasRequestN(Integer.MAX_VALUE) + .hasFirstFrameSentFlag(); activeStreams.assertHasStream(1, requestChannelRequesterFlux); FrameAssert.assertThat(sender.awaitFrame()) .typeOf(FrameType.REQUEST_CHANNEL) @@ -803,13 +846,13 @@ public void shouldRemoveItselfFromActiveStreamsWhenInboundAndOutboundAreTerminat stateAssert.hasSubscribedFlagOnly(); assertSubscriber.request(Integer.MAX_VALUE); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasRequestN(Integer.MAX_VALUE).hasNoFirstFrameSentFlag(); activeStreams.assertNoActiveStreams(); Payload requestPayload = TestRequesterResponderSupport.randomPayload(allocator); publisher.next(requestPayload); - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); activeStreams.assertHasStream(1, requestChannelRequesterFlux); FrameAssert.assertThat(sender.awaitFrame()) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java index 890458caf..829ba34fa 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java @@ -85,32 +85,32 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp activeStreams.activeStreams.put(1, requestChannelResponderSubscriber); // state machine check - stateAssert.isUnsubscribed().hasRequestN(0); + stateAssert.isUnsubscribed().hasRequestedTimes(0); activeStreams.assertHasStream(1, requestChannelResponderSubscriber); publisher.subscribe(requestChannelResponderSubscriber); publisher.assertMaxRequested(1); // state machine check - stateAssert.isUnsubscribed().hasRequestN(0); + stateAssert.isUnsubscribed().hasRequestedTimes(0); final AssertSubscriber assertSubscriber = requestChannelResponderSubscriber.subscribeWith(AssertSubscriber.create(0)); Assertions.assertThat(firstPayload.refCnt()).isOne(); // state machine check - stateAssert.hasSubscribedFlagOnly().hasRequestN(0); + stateAssert.hasSubscribedFlagOnly().hasRequestedTimes(0); assertSubscriber.request(1); // state machine check - stateAssert.hasSubscribedFlag().hasFirstFrameSentFlag().hasRequestN(1); + stateAssert.hasSubscribedFlag().hasFirstFrameSentFlag().hasRequestN(1).hasRequestedTimes(1); // should not send requestN since 1 is remaining Assertions.assertThat(sender.isEmpty()).isTrue(); assertSubscriber.request(1); - stateAssert.hasSubscribedFlag().hasRequestN(2).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestN(2).hasRequestedTimes(2).hasFirstFrameSentFlag(); // should not send requestN since 1 is remaining FrameAssert.assertThat(sender.awaitFrame()) @@ -148,7 +148,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp Assertions.assertThat(sender.isEmpty()).isTrue(); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestN(Long.MAX_VALUE).hasRequestedTimes(3).hasFirstFrameSentFlag(); Payload nextPayload = TestRequesterResponderSupport.genericPayload(allocator); requestChannelResponderSubscriber.handlePayload(nextPayload); @@ -165,7 +165,8 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(3) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag(); @@ -180,7 +181,8 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(3) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag(); @@ -199,7 +201,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag() .hasInboundTerminated(); @@ -221,7 +223,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag() .hasInboundTerminated(); @@ -238,7 +240,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestN(Long.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag() .hasOutboundTerminated(); @@ -278,32 +280,32 @@ public void failOnOverflow() { activeStreams.activeStreams.put(1, requestChannelResponderSubscriber); // state machine check - stateAssert.isUnsubscribed().hasRequestN(0); + stateAssert.isUnsubscribed().hasRequestedTimes(0); activeStreams.assertHasStream(1, requestChannelResponderSubscriber); publisher.subscribe(requestChannelResponderSubscriber); publisher.assertMaxRequested(1); // state machine check - stateAssert.isUnsubscribed().hasRequestN(0); + stateAssert.isUnsubscribed().hasRequestedTimes(0); final AssertSubscriber assertSubscriber = requestChannelResponderSubscriber.subscribeWith(AssertSubscriber.create(0)); Assertions.assertThat(firstPayload.refCnt()).isOne(); // state machine check - stateAssert.hasSubscribedFlagOnly().hasRequestN(0); + stateAssert.hasSubscribedFlagOnly().hasRequestedTimes(0); assertSubscriber.request(1); // state machine check - stateAssert.hasSubscribedFlag().hasFirstFrameSentFlag().hasRequestN(1); + stateAssert.hasSubscribedFlag().hasFirstFrameSentFlag().hasRequestedTimes(1).hasRequestN(1); // should not send requestN since 1 is remaining Assertions.assertThat(sender.isEmpty()).isTrue(); assertSubscriber.request(1); - stateAssert.hasSubscribedFlag().hasRequestN(2).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(2).hasRequestN(2).hasFirstFrameSentFlag(); // should not send requestN since 1 is remaining FrameAssert.assertThat(sender.awaitFrame()) @@ -358,20 +360,20 @@ public void failOnOverflowBeforeFirstPayloadIsSent() { activeStreams.activeStreams.put(1, requestChannelResponderSubscriber); // state machine check - stateAssert.isUnsubscribed().hasRequestN(0); + stateAssert.isUnsubscribed().hasRequestedTimes(0); activeStreams.assertHasStream(1, requestChannelResponderSubscriber); publisher.subscribe(requestChannelResponderSubscriber); publisher.assertMaxRequested(1); // state machine check - stateAssert.isUnsubscribed().hasRequestN(0); + stateAssert.isUnsubscribed().hasRequestedTimes(0); final AssertSubscriber assertSubscriber = requestChannelResponderSubscriber.subscribeWith(AssertSubscriber.create(0)); Assertions.assertThat(firstPayload.refCnt()).isOne(); // state machine check - stateAssert.hasSubscribedFlagOnly().hasRequestN(0); + stateAssert.hasSubscribedFlagOnly().hasRequestedTimes(0); Payload unrequestedPayload = TestRequesterResponderSupport.genericPayload(allocator); requestChannelResponderSubscriber.handlePayload(unrequestedPayload); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestResponseRequesterMonoTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestResponseRequesterMonoTest.java index b39ac62d9..169e764d7 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestResponseRequesterMonoTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestResponseRequesterMonoTest.java @@ -96,7 +96,7 @@ public void frameShouldBeSentOnSubscription( .then(() -> Assertions.assertThat(payload.refCnt()).isOne()) .then(activeStreams::assertNoActiveStreams) .thenRequest(1) - .then(() -> stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag()) + .then(() -> stateAssert.hasSubscribedFlag().hasRequestN(1).hasRequestedTimes(1).hasFirstFrameSentFlag()) .then(() -> Assertions.assertThat(payload.refCnt()).isZero()) .then(() -> activeStreams.assertHasStream(1, requestResponseRequesterMono))) .verify(); @@ -180,7 +180,8 @@ public void frameShouldBeSentOnSubscription( () -> stateAssert .hasSubscribedFlag() - .hasRequestN(1) + .hasRequestN(1) + .hasRequestedTimes(1) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -195,7 +196,7 @@ public void frameShouldBeSentOnSubscription( () -> stateAssert .hasSubscribedFlag() - .hasRequestN(1) + .hasRequestedTimes(1) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -210,7 +211,7 @@ public void frameShouldBeSentOnSubscription( () -> stateAssert .hasSubscribedFlag() - .hasRequestN(1) + .hasRequestedTimes(1) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -267,7 +268,7 @@ public void frameShouldBeSentOnSubscription( () -> stateAssert .hasSubscribedFlag() - .hasRequestN(1) + .hasRequestedTimes(1) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -279,7 +280,7 @@ public void frameShouldBeSentOnSubscription( () -> stateAssert .hasSubscribedFlag() - .hasRequestN(1) + .hasRequestedTimes(1) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -291,7 +292,7 @@ public void frameShouldBeSentOnSubscription( () -> stateAssert .hasSubscribedFlag() - .hasRequestN(1) + .hasRequestedTimes(1) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then(payload::release) @@ -346,7 +347,7 @@ public void frameFragmentsShouldBeSentOnSubscription( .then(() -> Assertions.assertThat(payload.refCnt()).isOne()) .then(activeStreams::assertNoActiveStreams) .thenRequest(1) - .then(() -> stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag()) + .then(() -> stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasFirstFrameSentFlag()) .then(() -> Assertions.assertThat(payload.refCnt()).isZero()) .then(() -> activeStreams.assertHasStream(1, requestResponseRequesterMono))) .verify(); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java index 8702d1a80..42a8e7ab4 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java @@ -106,7 +106,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately() { activeStreams.assertHasStream(1, requestStreamRequesterFlux); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasFirstFrameSentFlag(); final ByteBuf frame = sender.awaitFrame(); FrameAssert.assertThat(frame) @@ -139,7 +139,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately() { // state machine check. Request N Frame should sent so request field should be 0 // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(2).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(2).hasFirstFrameSentFlag(); assertSubscriber.request(Long.MAX_VALUE); final ByteBuf requestMaxNFrame = sender.awaitFrame(); @@ -154,13 +154,13 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately() { Assertions.assertThat(sender.isEmpty()).isTrue(); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(Integer.MAX_VALUE).hasFirstFrameSentFlag(); assertSubscriber.request(6); Assertions.assertThat(sender.isEmpty()).isTrue(); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(Integer.MAX_VALUE).hasFirstFrameSentFlag(); int mtu = ThreadLocalRandom.current().nextInt(64, 256); Payload randomPayload = TestRequesterResponderSupport.randomPayload(allocator); @@ -173,7 +173,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately() { // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag(); @@ -188,7 +188,7 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately() { // state machine check stateAssert .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag(); @@ -252,7 +252,7 @@ public void requestNFrameShouldBeSentExactlyOnceIfItIsMaxAllowed() { // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(Integer.MAX_VALUE).hasFirstFrameSentFlag(); Assertions.assertThat(payload.refCnt()).isZero(); activeStreams.assertHasStream(1, requestStreamRequesterFlux); @@ -279,7 +279,7 @@ public void requestNFrameShouldBeSentExactlyOnceIfItIsMaxAllowed() { // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(Integer.MAX_VALUE).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(Integer.MAX_VALUE).hasFirstFrameSentFlag(); requestStreamRequesterFlux.handlePayload(EmptyPayload.INSTANCE); requestStreamRequesterFlux.handleComplete(); @@ -360,7 +360,7 @@ public void frameShouldBeSentOnFirstRequest( .then( () -> // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag()) + stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasFirstFrameSentFlag()) .then(() -> Assertions.assertThat(payload.refCnt()).isZero()) .then(() -> activeStreams.assertHasStream(1, requestStreamRequesterFlux))) .verify(); @@ -420,7 +420,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag()) .then(() -> rsf.handlePayload(EmptyPayload.INSTANCE)) .thenRequest(1L) @@ -429,7 +429,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag()) .expectNext(EmptyPayload.INSTANCE) .thenRequest(1L) @@ -438,7 +438,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag()) .then(rsf::handleComplete) .thenRequest(1L) @@ -456,7 +456,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag()) .then(rsf::handleComplete) .thenRequest(1L) @@ -474,7 +474,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag()) .then(() -> rsf.handleError(new ApplicationErrorException("test"))) .then( @@ -515,7 +515,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(1) + .hasRequestedTimes(1) .hasSubscribedFlag() .hasFirstFrameSentFlag() .hasReassemblingFlag()) @@ -531,7 +531,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(1) + .hasRequestedTimes(1) .hasSubscribedFlag() .hasFirstFrameSentFlag() .hasReassemblingFlag()) @@ -547,7 +547,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(1) + .hasRequestedTimes(1) .hasSubscribedFlag() .hasFirstFrameSentFlag() .hasReassemblingFlag()) @@ -556,7 +556,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasSubscribedFlag() .hasFirstFrameSentFlag() .hasReassemblingFlag()) @@ -572,7 +572,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasSubscribedFlag() .hasFirstFrameSentFlag() .hasNoReassemblingFlag()) @@ -591,7 +591,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasSubscribedFlag() .hasFirstFrameSentFlag()) .assertNext( @@ -607,7 +607,7 @@ public void frameShouldBeSentOnFirstRequest( () -> // state machine check StateAssert.assertThat(rsf) - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasSubscribedFlag() .hasFirstFrameSentFlag()) .then(rsf::handleComplete) @@ -651,7 +651,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasNoReassemblingFlag()) .then( @@ -664,7 +664,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -677,7 +677,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then( @@ -690,7 +690,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .thenRequest(1) @@ -699,7 +699,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .thenRequest(1) @@ -708,7 +708,7 @@ public void frameShouldBeSentOnFirstRequest( // state machine check StateAssert.assertThat(rsf) .hasSubscribedFlag() - .hasRequestN(Integer.MAX_VALUE) + .hasRequestedTimes(Integer.MAX_VALUE) .hasFirstFrameSentFlag() .hasReassemblingFlag()) .then(payload::release) @@ -1159,7 +1159,7 @@ public void failOnOverflow() { activeStreams.assertHasStream(1, requestStreamRequesterFlux); // state machine check - stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestedTimes(1).hasFirstFrameSentFlag(); final ByteBuf frame = sender.awaitFrame(); FrameAssert.assertThat(frame) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequesterOperatorsRacingTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequesterOperatorsRacingTest.java index 06d050f6f..91a30b87f 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequesterOperatorsRacingTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequesterOperatorsRacingTest.java @@ -579,7 +579,7 @@ public void shouldHaveNoUnexpectedErrorDuringOnErrorAndCancelRacing(Scenario sce assertSubscriber.request(1); - stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag(); + stateAssert.hasSubscribedFlag().hasRequestN(1).hasRequestedTimes(1).hasFirstFrameSentFlag(); final ByteBuf sentFrame = activeStreams.getDuplexConnection().awaitFrame(); FrameAssert.assertThat(sentFrame) diff --git a/rsocket-core/src/test/java/io/rsocket/core/ShouldHaveFlag.java b/rsocket-core/src/test/java/io/rsocket/core/ShouldHaveFlag.java index 88e0dc8e2..fe26be8a9 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ShouldHaveFlag.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ShouldHaveFlag.java @@ -1,14 +1,12 @@ package io.rsocket.core; -import static io.rsocket.core.StateUtils.REQUEST_MASK; -import static io.rsocket.core.StateUtils.SUBSCRIBED_FLAG; -import static io.rsocket.core.StateUtils.extractRequestN; - import java.util.HashMap; import java.util.Map; import org.assertj.core.error.BasicErrorMessageFactory; import org.assertj.core.error.ErrorMessageFactory; +import static io.rsocket.core.StateUtils.*; + class ShouldHaveFlag extends BasicErrorMessageFactory { static final Map FLAGS_NAMES = @@ -70,7 +68,7 @@ private static String extractStateAsString(long currentState) { stringBuilder.append(FLAGS_NAMES.get(flag)); } } - long requestN = extractRequestN(currentState); + long requestN = requestedTimes(currentState); if (requestN > 0) { if (stringBuilder.length() > 0) { stringBuilder.append(", "); diff --git a/rsocket-core/src/test/java/io/rsocket/core/ShouldNotHaveFlag.java b/rsocket-core/src/test/java/io/rsocket/core/ShouldNotHaveFlag.java index e281e548c..ec90420c8 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ShouldNotHaveFlag.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ShouldNotHaveFlag.java @@ -1,14 +1,12 @@ package io.rsocket.core; -import static io.rsocket.core.StateUtils.REQUEST_MASK; -import static io.rsocket.core.StateUtils.SUBSCRIBED_FLAG; -import static io.rsocket.core.StateUtils.extractRequestN; - import java.util.HashMap; import java.util.Map; import org.assertj.core.error.BasicErrorMessageFactory; import org.assertj.core.error.ErrorMessageFactory; +import static io.rsocket.core.StateUtils.*; + class ShouldNotHaveFlag extends BasicErrorMessageFactory { static final Map FLAGS_NAMES = @@ -47,7 +45,7 @@ static ErrorMessageFactory shouldNotHaveFlag(long currentState, long expectedFla stringBuilder.append(FLAGS_NAMES.get(flag)); } } - long requestN = extractRequestN(currentState); + long requestN = requestedTimes(currentState); if (requestN > 0) { if (stringBuilder.length() > 0) { stringBuilder.append(", "); diff --git a/rsocket-core/src/test/java/io/rsocket/core/StateAssert.java b/rsocket-core/src/test/java/io/rsocket/core/StateAssert.java index 64253984b..767e65402 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/StateAssert.java +++ b/rsocket-core/src/test/java/io/rsocket/core/StateAssert.java @@ -20,46 +20,51 @@ import static io.rsocket.core.StateUtils.*; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + import org.assertj.core.api.AbstractAssert; import org.assertj.core.internal.Failures; public class StateAssert extends AbstractAssert, AtomicLongFieldUpdater> { public static StateAssert assertThat(AtomicLongFieldUpdater updater, T instance) { - return new StateAssert<>(updater, instance); + return new StateAssert<>(updater, instance, () -> 0L); } public static StateAssert assertThat( FireAndForgetRequesterMono instance) { - return new StateAssert<>(FireAndForgetRequesterMono.STATE, instance); + return new StateAssert<>(FireAndForgetRequesterMono.STATE, instance, () -> 0L); } public static StateAssert assertThat( RequestResponseRequesterMono instance) { - return new StateAssert<>(RequestResponseRequesterMono.STATE, instance); + return new StateAssert<>(RequestResponseRequesterMono.STATE, instance, () -> 0L); } public static StateAssert assertThat( RequestStreamRequesterFlux instance) { - return new StateAssert<>(RequestStreamRequesterFlux.STATE, instance); + return new StateAssert<>(RequestStreamRequesterFlux.STATE, instance, () -> instance.requested); } public static StateAssert assertThat( RequestChannelRequesterFlux instance) { - return new StateAssert<>(RequestChannelRequesterFlux.STATE, instance); + return new StateAssert<>(RequestChannelRequesterFlux.STATE, instance, () -> instance.requested); } public static StateAssert assertThat( RequestChannelResponderSubscriber instance) { - return new StateAssert<>(RequestChannelResponderSubscriber.STATE, instance); + return new StateAssert<>(RequestChannelResponderSubscriber.STATE, instance, () -> instance.requested); } private final Failures failures = Failures.instance(); private final T instance; + private final LongSupplier requestNSupplier; - public StateAssert(AtomicLongFieldUpdater updater, T instance) { + public StateAssert(AtomicLongFieldUpdater updater, T instance, LongSupplier requestNSupplier) { super(updater, StateAssert.class); this.instance = instance; + this.requestNSupplier = requestNSupplier; } public StateAssert isUnsubscribed() { @@ -86,19 +91,26 @@ public StateAssert hasSubscribedFlag() { return this; } - public StateAssert hasRequestN(long n) { + public StateAssert hasRequestedTimes(int n) { long currentState = actual.get(instance); - if (extractRequestN(currentState) != n) { + if (requestedTimes(currentState) != n) { throw failures.failure(info, shouldHaveRequestN(currentState, n)); } return this; } + public StateAssert hasRequestN(long n) { + final long requestN = requestNSupplier.getAsLong(); + if (requestN != n) { + throw failures.failure(info, shouldHaveRequestN(requestN, n)); + } + return this; + } + public StateAssert hasRequestNBetween(long min, long max) { - long currentState = actual.get(instance); - final long requestN = extractRequestN(currentState); + final long requestN = requestNSupplier.getAsLong(); if (requestN < min || requestN > max) { - throw failures.failure(info, shouldHaveRequestNBetween(currentState, min, max)); + throw failures.failure(info, shouldHaveRequestNBetween(requestN, min, max)); } return this; }