Skip to content

Commit

Permalink
ensures late onRequest consumer observes demand (#3557)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Co-authored-by: Dariusz Jędrzejczyk <jdariusz@vmware.com>
  • Loading branch information
OlegDokuka and chemicL committed Aug 10, 2023
1 parent 45fe19b commit 861de73
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.LongConsumer;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.J_Result;
import org.reactivestreams.Subscription;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

public abstract class FluxCreateStressTest {

@JCStressTest
@Outcome(id = {"4"}, expect = ACCEPTABLE, desc = "demand delivered")
@State
public static class RequestAndOnRequestStressTest implements LongConsumer {

FluxSink<? super Integer> sink;

Subscription s;

volatile long observedDemand;
static final AtomicLongFieldUpdater<RequestAndOnRequestStressTest> OBSERVED_DEMAND
= AtomicLongFieldUpdater.newUpdater(RequestAndOnRequestStressTest.class, "observedDemand");

{
Flux.<Integer>create(sink -> this.sink = sink)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
RequestAndOnRequestStressTest.this.s = subscription;
}
});
}

@Override
public void accept(long value) {
Operators.addCap(OBSERVED_DEMAND, this, value);
}

@Actor
public void request() {
s.request(1);
s.request(1);
s.request(1);
s.request(1);
}

@Actor
public void setOnRequestConsumer() {
sink.onRequest(this);
}


@Arbiter
public void artiber(J_Result r) {
r.r1 = observedDemand;
}
}
}
110 changes: 87 additions & 23 deletions reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -268,7 +268,7 @@ void drainLoop() {

@Override
public FluxSink<T> onRequest(LongConsumer consumer) {
sink.onRequest(consumer, consumer, sink.requested);
sink.onPushPullRequest(consumer);
return this;
}

Expand Down Expand Up @@ -409,6 +409,8 @@ static abstract class BaseSink<T> extends AtomicBoolean
static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
static final Disposable CANCELLED = Disposables.disposed();

static final LongConsumer NOOP_CONSUMER = n -> {};

final CoreSubscriber<? super T> actual;
final Context ctx;

Expand All @@ -434,6 +436,7 @@ static abstract class BaseSink<T> extends AtomicBoolean
BaseSink(CoreSubscriber<? super T> actual) {
this.actual = actual;
this.ctx = actual.currentContext();
REQUESTED.lazySet(this, Long.MIN_VALUE);
}

@Override
Expand Down Expand Up @@ -500,7 +503,7 @@ void disposeResource(boolean isCancel) {

@Override
public long requestedFromDownstream() {
return requested;
return requested & Long.MAX_VALUE;
}

void onCancel() {
Expand All @@ -519,12 +522,15 @@ final boolean isTerminated() {
@Override
public final void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
long s = addCap(this, n);

LongConsumer consumer = requestConsumer;
if (n > 0 && consumer != null && !isCancelled()) {
consumer.accept(n);
if (hasRequestConsumer(s)) {
LongConsumer consumer = requestConsumer;
if (!isCancelled()) {
consumer.accept(n);
}
}

onRequestedFromDownstream();
}
}
Expand All @@ -541,20 +547,29 @@ public CoreSubscriber<? super T> actual() {
@Override
public FluxSink<T> onRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "onRequest");
onRequest(consumer, n -> {
}, Long.MAX_VALUE);
onPushRequest(consumer);
return this;
}

protected void onRequest(LongConsumer initialRequestConsumer,
LongConsumer requestConsumer,
long value) {
protected void onPushRequest(LongConsumer initialRequestConsumer) {
if (!REQUEST_CONSUMER.compareAndSet(this, null, NOOP_CONSUMER)) {
throw new IllegalStateException(
"A consumer has already been assigned to consume requests");
}

// do not change real flag since real consumer is technically absent
initialRequestConsumer.accept(Long.MAX_VALUE);
}

protected void onPushPullRequest(LongConsumer requestConsumer) {
if (!REQUEST_CONSUMER.compareAndSet(this, null, requestConsumer)) {
throw new IllegalStateException(
"A consumer has already been assigned to consume requests");
}
else if (value > 0) {
initialRequestConsumer.accept(value);

long initialRequest = markRequestConsumerSet(this);
if (initialRequest > 0) {
requestConsumer.accept(initialRequest);
}
}

Expand Down Expand Up @@ -607,7 +622,7 @@ else if (c instanceof SinkDisposable) {
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return disposable == TERMINATED;
if (key == Attr.CANCELLED) return disposable == CANCELLED;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requestedFromDownstream();
if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC;

return InnerProducer.super.scanUnsafe(key);
Expand All @@ -617,6 +632,54 @@ public Object scanUnsafe(Attr key) {
public String toString() {
return "FluxSink";
}

static <T> void produced(BaseSink<T> instance, long toSub) {
long s, r, u;
do {
s = instance.requested;
r = s & Long.MAX_VALUE;
if (r == 0 || r == Long.MAX_VALUE) {
return;
}
u = Operators.subOrZero(r, toSub);
} while (!REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE)));
}


static <T> long addCap(BaseSink<T> instance, long toAdd) {
long r, u, s;
for (;;) {
s = instance.requested;
r = s & Long.MAX_VALUE;
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
u = Operators.addCap(r, toAdd);
if (REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))) {
return s;
}
}
}

static <T> long markRequestConsumerSet(BaseSink<T> instance) {
long u, s;
for (;;) {
s = instance.requested;

if (hasRequestConsumer(s)) {
return s;
}

u = s & Long.MAX_VALUE;
if (REQUESTED.compareAndSet(instance, s, u)) {
return u;
}
}
}

static boolean hasRequestConsumer(long requestedState) {
return (requestedState & Long.MIN_VALUE) == 0;
}
}

static final class IgnoreSink<T> extends BaseSink<T> {
Expand All @@ -639,8 +702,9 @@ public FluxSink<T> next(T t) {
actual.onNext(t);

for (; ; ) {
long r = requested;
if (r == 0L || REQUESTED.compareAndSet(this, r, r - 1)) {
long s = requested;
long r = s & Long.MAX_VALUE;
if (r == 0L || REQUESTED.compareAndSet(this, s, (r - 1) | (s & Long.MIN_VALUE))) {
return this;
}
}
Expand All @@ -665,9 +729,9 @@ public final FluxSink<T> next(T t) {
return this;
}

if (requested != 0) {
if (requestedFromDownstream() != 0) {
actual.onNext(t);
Operators.produced(REQUESTED, this, 1);
produced(this, 1);
}
else {
onOverflow();
Expand Down Expand Up @@ -776,7 +840,7 @@ void drain() {
final Queue<T> q = queue;

for (; ; ) {
long r = requested;
long r = requestedFromDownstream();
long e = 0L;

while (e != r) {
Expand Down Expand Up @@ -844,7 +908,7 @@ void drain() {
}

if (e != 0) {
Operators.produced(REQUESTED, this, e);
produced(this, e);
}

if (WIP.decrementAndGet(this) == 0) {
Expand Down Expand Up @@ -936,7 +1000,7 @@ void drain() {
final AtomicReference<T> q = queue;

for (; ; ) {
long r = requested;
long r = requestedFromDownstream();
long e = 0L;

while (e != r) {
Expand Down Expand Up @@ -1006,7 +1070,7 @@ void drain() {
}

if (e != 0) {
Operators.produced(REQUESTED, this, e);
produced(this, e);
}

if (WIP.decrementAndGet(this) == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,11 @@ default ContextView contextView() {
* or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)},
* the consumer
* is invoked for every request to enable a hybrid backpressure-enabled push/pull model.
* <p>
* <strong>Note:</strong> in case of multiple {@link Subscription#request} happening
* concurrently to this method, the first consumer invocation may process
* accumulated demand instead of being called multiple times.
* <p>
* When bridging with asynchronous listener-based APIs, the {@code onRequest} callback
* may be used to request more data from source if required and to manage backpressure
* by delivering data to sink only when requests are pending.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,10 @@

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +30,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.EnumSource;
import org.reactivestreams.Subscriber;
Expand All @@ -53,6 +57,36 @@

class FluxCreateTest {

@Test
//https://github.com/reactor/reactor-core/issues/1949
void ensuresConcurrentRequestAndSettingOnRequestAlwaysDeliversDemand() throws ExecutionException, InterruptedException {
AtomicReference<Subscription> sub = new AtomicReference<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
int i = 0;
int attempts = 100;
for (AtomicBoolean requested = new AtomicBoolean(true); requested.getAndSet(false) && i < attempts; ++i) {
CountDownLatch latch = new CountDownLatch(1);
FutureTask<Void> task = new FutureTask<>(() -> sub.get().request(1), null);
Flux.create(sink -> sink.onRequest(__ -> {
requested.set(true);
// onRequest can be delivered asynchronously after request(n) has
// returned, so latch coordinates successful request->onRequest
// completion.
latch.countDown();
}))
.subscribe(new BaseSubscriber<Object>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
sub.set(subscription);
executor.execute(task);
}
});
latch.await(100, TimeUnit.MILLISECONDS);
}
executor.shutdown();
Assertions.assertThat(i).as("Failed after %d attempts", i).isEqualTo(attempts);
}

@Test
void normalBuffered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Expand Down

0 comments on commit 861de73

Please sign in to comment.