diff --git a/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java new file mode 100644 index 0000000000..9da6292ce9 --- /dev/null +++ b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; + +@BenchmarkMode({Mode.AverageTime}) +@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class FluxBufferTimeoutBenchmark { + + private static final int TOTAL_VALUES = 100; + + @Param({"1", "10", "100"}) + int bufferSize; + + @Benchmark + public Object unlimited(Blackhole blackhole) throws InterruptedException { + JmhSubscriber subscriber = new JmhSubscriber(blackhole, false); + Flux.range(0, TOTAL_VALUES) + .bufferTimeout(bufferSize, Duration.ofDays(100), true) + .subscribe(subscriber); + subscriber.await(); + return subscriber; + } + + @Benchmark + public Object oneByOne(Blackhole blackhole) throws InterruptedException { + JmhSubscriber subscriber = new JmhSubscriber(blackhole, true); + Flux.range(0, TOTAL_VALUES) + .bufferTimeout(bufferSize, Duration.ofDays(100), true) + .subscribe(subscriber); + subscriber.await(); + return subscriber; + } + + + public static class JmhSubscriber extends CountDownLatch + implements CoreSubscriber> { + + private final Blackhole blackhole; + private final boolean oneByOneRequest; + private Subscription s; + + public JmhSubscriber(Blackhole blackhole, boolean oneByOneRequest) { + super(1); + this.blackhole = blackhole; + this.oneByOneRequest = oneByOneRequest; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + if (oneByOneRequest) { + s.request(1); + } else { + s.request(Long.MAX_VALUE); + } + } + } + + @Override + public void onNext(List t) { + blackhole.consume(t); + if (oneByOneRequest) { + s.request(1); + } + } + + @Override + public void onError(Throwable t) { + blackhole.consume(t); + countDown(); + } + + @Override + public void onComplete() { + countDown(); + } + } +} diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java index d19e1122e4..adf6e2ccb6 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2023-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,10 +18,12 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.openjdk.jcstress.annotations.Actor; import org.openjdk.jcstress.annotations.Arbiter; @@ -30,23 +32,28 @@ import org.openjdk.jcstress.annotations.Outcome; import org.openjdk.jcstress.annotations.State; import org.openjdk.jcstress.infra.results.LLL_Result; +import org.openjdk.jcstress.infra.results.LL_Result; import reactor.core.util.FastLogger; import reactor.test.scheduler.VirtualTimeScheduler; +import static java.util.Collections.emptyList; + public class FluxBufferTimeoutStressTest { @JCStressTest - @Outcome(id = "1, 1, 1", expect = Expect.ACCEPTABLE, desc = "") - @Outcome(id = "2, 1, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "1, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "2, 1", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndTimeout { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = - new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null); + new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); final StressSubscription subscription = new StressSubscription<>(bufferTimeoutSubscriber); @@ -67,35 +74,42 @@ public void timeout() { } @Arbiter - public void arbiter(LLL_Result result) { + public void arbiter(LL_Result result) { result.r1 = subscriber.onNextCalls.get(); - result.r2 = subscriber.onCompleteCalls.get(); - result.r3 = subscription.requestsCount.get(); + result.r2 = subscription.requestsCount.get(); - if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + if (subscriber.onCompleteCalls.get() != 1) { + fail(fastLogger, + "unexpected completion count " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); + } + if (!subscriber.discardedValues.isEmpty()) { + fail(fastLogger, "Unexpected discarded values " + subscriber.discardedValues); } } } @JCStressTest - @Outcome(id = "3, 1, 1", expect = Expect.ACCEPTABLE, desc = "") - @Outcome(id = "4, 1, 1", expect = Expect.ACCEPTABLE, desc = "") - @Outcome(id = "5, 1, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "3, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "4, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "5, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "3, 2", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "4, 2", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "5, 2", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeouts { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); - final FastLogger fastLogger = new FastLogger(getClass().getName()); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); @@ -126,22 +140,25 @@ public void timeout() { } @Arbiter - public void arbiter(LLL_Result result) { + public void arbiter(LL_Result result) { result.r1 = subscriber.onNextCalls.get(); - result.r2 = subscriber.onCompleteCalls.get(); - result.r3 = subscription.requestsCount.get(); + result.r2 = subscription.requestsCount.get(); if (subscriber.onCompleteCalls.get() != 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion: " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); } - if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) { - throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + if (!subscriber.discardedValues.isEmpty()) { + fail(fastLogger, "Unexpected discarded values " + subscriber.discardedValues); + } + if (!allValuesHandled(fastLogger, 5, emptyList(), + subscriber.receivedValues)) { + fail(fastLogger, "not all values delivered; result=" + result); } } } @@ -155,19 +172,23 @@ public void arbiter(LLL_Result result) { @Outcome(id = "5, 0, 3", expect = Expect.ACCEPTABLE, desc = "") @Outcome(id = "5, 0, 4", expect = Expect.ACCEPTABLE, desc = "") @Outcome(id = "5, 0, 5", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "5, 0, 1", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeoutsPossiblyIncomplete { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(1); - final FastLogger fastLogger = new FastLogger(getClass().getName()); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); Sinks.Many proxy = Sinks.unsafe().many().unicast().onBackpressureBuffer(); + final AtomicLong requested = new AtomicLong(); + { proxy.asFlux() .doOnRequest(r -> requested.incrementAndGet()) @@ -213,29 +234,33 @@ public void arbiter(LLL_Result result) { result.r2 = subscriber.onCompleteCalls.get(); result.r3 = requested.get(); + if (!allValuesHandled(fastLogger, 4, emptyList(), subscriber.receivedValues)) { + fail(fastLogger, "minimum set of values not delivered"); + } + if (subscriber.onCompleteCalls.get() == 0) { - if (subscriber.receivedValues.stream() - .noneMatch(buf -> buf.size() == 1)) { - throw new IllegalStateException("incomplete but received all two " + - "element buffers. received: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + if (subscriber.receivedValues.size() == 5 && + subscriber.receivedValues.stream().noneMatch(buf -> buf.size() == 1)) { + fail(fastLogger, "incomplete but delivered all two " + + "element buffers. received: " + subscriber.receivedValues + "; result=" + result); } } - if (subscriber.onNextCalls.get() < 5 && subscriber.onCompleteCalls.get() == 0) { - throw new IllegalStateException("incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result + "\n" + fastLogger); + if (subscriber.onNextCalls.get() < 5) { + fail(fastLogger, "incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result); } if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); } if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) { - throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + fail(fastLogger, "received an empty buffer: " + subscriber.receivedValues + "; result=" + result); } } } @@ -247,12 +272,14 @@ public void arbiter(LLL_Result result) { @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancel { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = - new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null); + new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); final StressSubscription subscription = new StressSubscription<>(bufferTimeoutSubscriber); @@ -279,13 +306,16 @@ public void arbiter(LLL_Result result) { result.r3 = subscription.requestsCount.get(); if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); + } + if (!allValuesHandled(fastLogger, 2, subscriber.discardedValues, subscriber.receivedValues)) { + fail(fastLogger, "Not all handled!" + "; result=" + result); } } } @@ -310,6 +340,7 @@ public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelWithBackpres new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); Sinks.Many proxy = Sinks.unsafe().many().unicast().onBackpressureBuffer(); + AtomicLong emits = new AtomicLong(); final AtomicLong requested = new AtomicLong(); { proxy.asFlux() @@ -319,11 +350,12 @@ public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelWithBackpres @Actor public void next() { - proxy.tryEmitNext(0L); - proxy.tryEmitNext(1L); - - proxy.tryEmitNext(2L); - proxy.tryEmitNext(3L); + for (long i = 0; i < 4; i++) { + if (proxy.tryEmitNext(i) != Sinks.EmitResult.OK) { + return; + } + emits.set(i + 1); + } proxy.tryEmitComplete(); } @@ -344,16 +376,26 @@ public void arbiter(LLL_Result result) { result.r3 = requested.get(); if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); } - if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) { - throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + int emits = (int) this.emits.get(); + if (subscriber.onCompleteCalls.get() == 1 && !allValuesHandled(fastLogger, 4, + emptyList(), + subscriber.receivedValues)) { + fail(fastLogger, + "Completed but not all values handled!" + "; result=" + result); + } + + if (subscriber.onNextCalls.get() > 0 && !allValuesHandled(fastLogger, emits, + subscriber.discardedValues, + subscriber.receivedValues)) { + fail(fastLogger, "Not all " + emits + " emits handled!" + "; result=" + result); } } } @@ -367,12 +409,14 @@ public void arbiter(LLL_Result result) { @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelAndTimeout { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = - new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null); + new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); final StressSubscription subscription = new StressSubscription<>(bufferTimeoutSubscriber); @@ -404,17 +448,71 @@ public void arbiter(LLL_Result result) { result.r3 = subscription.requestsCount.get(); if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, + "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); + } + + if (!allValuesHandled(fastLogger, 2, subscriber.discardedValues, subscriber.receivedValues)) { + fail(fastLogger, "Not all handled!" + "; result=" + result); } } } + private static void fail(FastLogger fastLogger, String msg) { + throw new IllegalStateException(msg + "\n" + fastLogger); + } + + private static boolean allValuesHandled(FastLogger logger, int range, List discarded, List>... delivered) { + if (delivered.length == 0) { + return false; + } + + List discardedValues = discarded.stream() + .map(o -> (Long) o) + .collect(Collectors.toList()); + + logger.trace("discarded: " + discardedValues); + logger.trace("delivered: " + Arrays.toString(delivered)); + + boolean[] search = new boolean[range]; + for (long l : discardedValues) { + search[(int) l] = true; + } + + List> all = + Arrays.stream(delivered) + .flatMap(lists -> lists.stream()) + .collect(Collectors.toList()); + + for (List buf : all) { + if (buf.isEmpty()) { + fail(logger, "Received empty buffer!"); + } + for (long l : buf) { + if (l >= range) { + // just check within the range + continue; + } + if (search[(int) l]) { + fail(logger, "Duplicate value (both discarded " + + "and delivered, or duplicated in multiple buffers)"); + } + search[(int) l] = true; + } + } + for (boolean b : search) { + if (!b) { + return false; + } + } + return true; + } private static Supplier> bufferSupplier() { return ArrayList::new; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index dcc6f7228f..1fa01762a4 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +23,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.scheduler.Scheduler; import reactor.util.Logger; @@ -47,6 +49,7 @@ final class FluxBufferTimeout> extends Intern final long timespan; final TimeUnit unit; final boolean fairBackpressure; + final Logger logger; FluxBufferTimeout(Flux source, int maxSize, @@ -68,6 +71,32 @@ final class FluxBufferTimeout> extends Intern this.batchSize = maxSize; this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier"); this.fairBackpressure = fairBackpressure; + this.logger = null; + } + + // for testing + FluxBufferTimeout(Flux source, + int maxSize, + long timespan, + TimeUnit unit, + Scheduler timer, + Supplier bufferSupplier, + boolean fairBackpressure, + Logger logger) { + super(source); + if (timespan <= 0) { + throw new IllegalArgumentException("Timeout period must be strictly positive"); + } + if (maxSize <= 0) { + throw new IllegalArgumentException("maxSize must be strictly positive"); + } + this.timer = Objects.requireNonNull(timer, "Timer"); + this.timespan = timespan; + this.unit = Objects.requireNonNull(unit, "unit"); + this.batchSize = maxSize; + this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier"); + this.fairBackpressure = fairBackpressure; + this.logger = logger; } @Override @@ -102,62 +131,69 @@ public Object scanUnsafe(Attr key) { final static class BufferTimeoutWithBackpressureSubscriber> implements InnerOperator { - @Nullable - final Logger logger; - final CoreSubscriber actual; - final int batchSize; - final int prefetch; - final long timeSpan; - final TimeUnit unit; - final Scheduler.Worker timer; - final Supplier bufferSupplier; + private final @Nullable Logger logger; + private final @Nullable StateLogger stateLogger; + private final CoreSubscriber actual; + private final int batchSize; + private final int prefetch; + private final int replenishMark; + private final long timeSpan; + private final TimeUnit unit; + private final Scheduler.Worker timer; + private final Supplier bufferSupplier; + private final Disposable.Swap currentTimeoutTask = Disposables.swap(); + private final Queue queue; + + private @Nullable Subscription subscription; + + private @Nullable Throwable error; + /** + * Flag used to mark that the operator is definitely done processing all state + * transitions. + */ + private boolean done; + /** + * Access to outstanding is always guarded by volatile access to state, so it + * needn't be volatile. It is also only ever accessed in the drain method, so + * it needn't be part of state either. + */ + private int outstanding; - // tracks unsatisfied downstream demand (expressed in # of buffers) + /** + * Tracks unsatisfied downstream demand (expressed in # of buffers). Package + * visibility for testing purposes. + */ volatile long requested; @SuppressWarnings("rawtypes") - private AtomicLongFieldUpdater REQUESTED = + private static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "requested"); - // tracks undelivered values in the current buffer - volatile int index; - @SuppressWarnings("rawtypes") - private AtomicIntegerFieldUpdater INDEX = - AtomicIntegerFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "index"); - - // tracks # of values requested from upstream but not delivered yet via this - // .onNext(v) - volatile long outstanding; - @SuppressWarnings("rawtypes") - private AtomicLongFieldUpdater OUTSTANDING = - AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "outstanding"); - - // indicates some thread is draining - volatile int wip; - @SuppressWarnings("rawtypes") - private AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "wip"); - - private volatile int terminated = NOT_TERMINATED; + /** + * The state field serves as the coordination point for multiple actors. The + * surrounding implementation works as a state machine that provides mutual + * exclusion with lock-free semantics. It uses bit masks to divide a 64-bit + * long value for multiple concerns while maintaining atomicity with CAS operations. + */ + private volatile long state; @SuppressWarnings("rawtypes") - private AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "terminated"); - - final static int NOT_TERMINATED = 0; - final static int TERMINATED_WITH_SUCCESS = 1; - final static int TERMINATED_WITH_ERROR = 2; - final static int TERMINATED_WITH_CANCEL = 3; - - @Nullable - private Subscription subscription; - - private Queue queue; - - @Nullable - Throwable error; - - boolean completed; - - Disposable currentTimeoutTask; + private static final AtomicLongFieldUpdater STATE = + AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "state"); + + static final long CANCELLED_FLAG = + 0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long TERMINATED_FLAG = + 0b0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long HAS_WORK_IN_PROGRESS_FLAG = + 0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long TIMEOUT_FLAG = + 0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long REQUESTED_INDEX_MASK = + 0b0000_1111_1111_1111_1111_1111_1111_1111_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long INDEX_MASK = + 0b0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_1111_1111_1111L; + + private static final int INDEX_SHIFT = 0; + private static final int REQUESTED_INDEX_SHIFT = 32; public BufferTimeoutWithBackpressureSubscriber( CoreSubscriber actual, @@ -174,11 +210,13 @@ public BufferTimeoutWithBackpressureSubscriber( this.timer = timer; this.bufferSupplier = bufferSupplier; this.logger = logger; + this.stateLogger = logger != null ? new StateLogger(logger) : null; this.prefetch = batchSize << 2; + this.replenishMark = batchSize << 1; this.queue = Queues.get(prefetch).get(); } - private void trace(Logger logger, String msg) { + private static void trace(Logger logger, String msg) { logger.trace(String.format("[%s][%s]", Thread.currentThread().getId(), msg)); } @@ -191,333 +229,303 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(T t) { + public CoreSubscriber actual() { + return this.actual; + } + + @Override + public void request(long n) { if (logger != null) { - trace(logger, "onNext: " + t); - } - // check if terminated (cancelled / error / completed) -> discard value if so - - // increment index - // append to buffer - // drain - - if (terminated == NOT_TERMINATED) { - // assume no more deliveries than requested - if (!queue.offer(t)) { - Context ctx = currentContext(); - Throwable error = Operators.onOperatorError(this.subscription, - Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), - t, actual.currentContext()); - this.error = error; - if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - Operators.onErrorDropped(error, ctx); - return; - } - Operators.onDiscard(t, ctx); - drain(); + trace(logger, "request " + n); + } + if (Operators.validate(n)) { + long previouslyRequested = Operators.addCap(REQUESTED, this, n); + if (previouslyRequested == Long.MAX_VALUE) { return; } - boolean shouldDrain = false; - for (;;) { - int index = this.index; - if (INDEX.compareAndSet(this, index, index + 1)) { - if (index == 0) { - try { - if (logger != null) { - trace(logger, "timerStart"); - } - currentTimeoutTask = timer.schedule(this::bufferTimedOut, - timeSpan, - unit); - } catch (RejectedExecutionException ree) { - if (logger != null) { - trace(logger, "Timer rejected for " + t); - } - Context ctx = actual.currentContext(); - Throwable error = Operators.onRejectedExecution(ree, subscription, null, t, ctx); - this.error = error; - if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - Operators.onDiscard(t, ctx); - Operators.onErrorDropped(error, ctx); - return; - } - if (logger != null) { - trace(logger, "Discarding upon timer rejection" + t); - } - Operators.onDiscard(t, ctx); - drain(); - return; - } - } - if ((index + 1) % batchSize == 0) { - shouldDrain = true; - } - break; - } - } - if (shouldDrain) { - if (currentTimeoutTask != null) { - // TODO: it can happen that AFTER I dispose, the timeout - // anyway kicks during/after another onNext(), the buffer is - // delivered, and THEN drain is entered -> - // it would emit a buffer that is too small potentially. - // ALSO: - // It is also possible that here we deliver the buffer, but the - // timeout is happening for a new buffer! - currentTimeoutTask.dispose(); - } - this.index = 0; - drain(); - } - } else { - if (logger != null) { - trace(logger, "Discarding onNext: " + t); + long previousState; + previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::incrementRequestIndex); + if (!hasWorkInProgress(previousState)) { + // If there was no demand before - try to fulfill the demand if there + // are buffered values. + drain(previouslyRequested == 0); } - Operators.onDiscard(t, currentContext()); } } @Override - public void onError(Throwable t) { - // set error flag - // set terminated as error + public void onNext(T t) { + if (logger != null) { + trace(logger, "onNext " + t); + } + if (this.done) { + Operators.onNextDropped(t, this.actual.currentContext()); + return; + } - // drain (WIP++ ?) + boolean enqueued = queue.offer(t); + if (!enqueued) { + this.error = Operators.onOperatorError( + this.subscription, + Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), + t, + this.actual.currentContext()); + Operators.onDiscard(t, this.actual.currentContext()); + } - if (currentTimeoutTask != null) { - currentTimeoutTask.dispose(); + long previousState; + + if (enqueued) { + // Only onNext increments the index. Drain can set it to 0 when it + // flushes. However, timeout does not reset it to 0, it has its own + // flag. + previousState = forceAddWork(this, state -> incrementIndex(state, 1)); + + // We can only fire the timer once we increment the index first so that + // the timer doesn't fire first as it would consume the element and try + // to decrement the index below 0. + if (getIndex(previousState) == 0) { + // fire timer, new buffer starts + try { + Disposable disposable = + timer.schedule(this::bufferTimedOut, timeSpan, unit); + currentTimeoutTask.update(disposable); + } catch (RejectedExecutionException e) { + this.error = Operators.onRejectedExecution(e, subscription, null, t, actual.currentContext()); + previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); + } + } + } else { + previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); } - timer.dispose(); - if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - Operators.onErrorDropped(t, currentContext()); - return; + if (!hasWorkInProgress(previousState)) { + drain(false); } - this.error = t; // wip in drain will publish the error - drain(); } - @Override - public void onComplete() { - // set terminated as completed - // drain - if (currentTimeoutTask != null) { - currentTimeoutTask.dispose(); + void bufferTimedOut() { + if (logger != null) { + trace(logger, "timedOut"); } - timer.dispose(); - - if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_SUCCESS)) { - drain(); + if (this.done) { + return; } - } - @Override - public CoreSubscriber actual() { - return this.actual; + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTimedOut); + + if (!hasWorkInProgress(previousState)) { + drain(false); + } } @Override - public void request(long n) { - // add cap to currently requested - // if previous requested was 0 -> drain first to deliver outdated values - // if the cap increased, request more ? - - // drain - - if (Operators.validate(n)) { - if (queue.isEmpty() && terminated != NOT_TERMINATED) { - return; - } + public void onError(Throwable t) { + if (logger != null) { + trace(logger, "onError " + t); + } + if (this.done) { + Operators.onErrorDropped(t, actual.currentContext()); + return; + } - if (Operators.addCap(REQUESTED, this, n) == 0) { - // there was no demand before - try to fulfill the demand if there - // are buffered values - drain(); - } + this.error = t; + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); - if (batchSize == Integer.MAX_VALUE || n == Long.MAX_VALUE) { - requestMore(Long.MAX_VALUE); - } else { - long requestLimit = prefetch; - if (requestLimit > outstanding) { - if (logger != null) { - trace(logger, "requestMore: " + (requestLimit - outstanding) + ", outstanding: " + outstanding); - } - requestMore(requestLimit - outstanding); - } - } + if (!hasWorkInProgress(previousState)) { + drain(false); } } - private void requestMore(long n) { - Subscription s = this.subscription; - if (s != null) { - Operators.addCap(OUTSTANDING, this, n); - s.request(n); + @Override + public void onComplete() { + if (logger != null) { + trace(logger, "onComplete"); + } + if (this.done) { + return; + } + + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); + + if (!hasWorkInProgress(previousState)) { + drain(false); } } @Override public void cancel() { - // set terminated flag - // cancel upstream subscription - // dispose timer - // drain for proper cleanup - if (logger != null) { trace(logger, "cancel"); } - if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_CANCEL)) { - if (this.subscription != null) { - this.subscription.cancel(); - } - } - if (currentTimeoutTask != null) { - currentTimeoutTask.dispose(); + if (this.done || isCancelled(this.state)) { + return; } - timer.dispose(); - drain(); - } - void bufferTimedOut() { - // called when buffer times out + if (this.subscription != null) { + subscription.cancel(); + } - // reset index to 0 - // drain + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setCancelled); - // TODO: try comparing against current reference and see if it was not - // cancelled -> to do this, replace Disposable timeoutTask with volatile - // and use CAS. - if (logger != null) { - trace(logger, "timerFire"); + if (!hasWorkInProgress(previousState)) { + drain(false); } - this.index = 0; // if currently being drained, it means the buffer is - // delivered due to reaching the batchSize - drain(); } - private void drain() { - // entering this should be guarded by WIP getAndIncrement == 0 - // if we're here it means do a flush if there is downstream demand - // regardless of queue size + /** + * Drain the queue and perform any actions that result from the current state. + * Ths method must only be called when the caller ensured exclusive access. + * That means that it successfully indicated there's work by setting the WIP flag. + * + * @param resumeDemand {@code true} if the previous {@link #requested demand} + * value was 0. + */ + private void drain(boolean resumeDemand) { + if (logger != null) { + trace(logger, "drain start"); + } + for (;;) { + long previousState = this.state; + long currentState = previousState; - // loop: - // if terminated -> check error -> deliver; else complete downstream - // if cancelled + if (done || isCancelled(currentState)) { + if (logger != null) { + trace(logger, "Discarding entire queue of " + queue.size()); + } + Operators.onDiscardQueueWithClear(queue, currentContext(), null); + currentState = tryClearWip(this, currentState); + if (!hasWorkInProgress(currentState)) { + return; + } + } else { + long index = getIndex(currentState); + long currentRequest = this.requested; + boolean shouldFlush = currentRequest > 0 + && (resumeDemand || isTimedOut(currentState) || isTerminated(currentState) || index >= batchSize); - if (WIP.getAndIncrement(this) == 0) { - for (;;) { - int wip = this.wip; + int consumed = 0; if (logger != null) { - trace(logger, "drain. wip: " + wip); + trace(logger, "should flush: " + shouldFlush + + " currentRequest: " + currentRequest + + " index: " + index + + " isTerminated: " + isTerminated(currentState) + + " isTimedOut: " + isTimedOut(currentState)); } - if (terminated == NOT_TERMINATED) { - // is there demand? - while (flushABuffer()) { - // no-op - } - // make another spin if there's more work - } else { - if (completed) { - // if queue is empty, the discard is ignored + if (shouldFlush) { + currentTimeoutTask.update(null); + for (; ; ) { + int consumedNow = flush(); if (logger != null) { - trace(logger, "Discarding entire queue of " + queue.size()); + trace(logger, "flushed: " + consumedNow); } - Operators.onDiscardQueueWithClear(queue, currentContext(), - null); - return; - } - // TODO: potentially the below can be executed twice? - if (terminated == TERMINATED_WITH_CANCEL) { - if (logger != null) { - trace(logger, "Discarding entire queue of " + queue.size()); - } - Operators.onDiscardQueueWithClear(queue, currentContext(), - null); - return; - } - while (flushABuffer()) { - // no-op - } - if (queue.isEmpty()) { - completed = true; - if (this.error != null) { - actual.onError(this.error); + // We need to make sure that if work is added we clear the + // resumeDemand with which we entered the drain loop as the + // state is now different. + resumeDemand = false; + if (consumedNow == 0) { + break; } - else { - actual.onComplete(); + consumed += consumedNow; + if (currentRequest != Long.MAX_VALUE) { + currentRequest = REQUESTED.decrementAndGet(this); } - } else { - if (logger != null) { - trace(logger, "Queue not empty after termination"); + if (currentRequest == 0) { + break; } } } - if (WIP.compareAndSet(this, wip, 0)) { - break; - } - } - } - } - boolean flushABuffer() { - long requested = this.requested; - if (requested != 0) { - T element; - C buffer; + boolean terminated = isTerminated(currentState); - element = queue.poll(); - if (element == null) { - // there is demand, but queue is empty - return false; - } - buffer = bufferSupplier.get(); - int i = 0; - do { - buffer.add(element); - } while ((++i < batchSize) && ((element = queue.poll()) != null)); - - if (requested != Long.MAX_VALUE) { - requested = REQUESTED.decrementAndGet(this); - } - - if (logger != null) { - trace(logger, "flush: " + buffer + ", now requested: " + requested); - } - - actual.onNext(buffer); - - if (requested != Long.MAX_VALUE) { - if (logger != null) { - trace(logger, "outstanding(" + outstanding + ") -= " + i); + if (consumed > 0) { + outstanding -= consumed; } - long remaining = OUTSTANDING.addAndGet(this, -i); - if (terminated == NOT_TERMINATED) { - int replenishMark = prefetch >> 1; // TODO: create field limit instead + if (!terminated && currentRequest > 0) { + // Request more from upstream. + int remaining = this.outstanding; if (remaining < replenishMark) { - if (logger != null) { - trace(logger, "replenish: " + (prefetch - remaining) + ", outstanding: " + outstanding); - } requestMore(prefetch - remaining); } } - if (requested <= 0) { - return false; + if (terminated && queue.isEmpty()) { + done = true; + if (logger != null) { + trace(logger, "terminated! error: " + this.error + " queue size: " + queue.size()); + } + if (this.error != null) { + Operators.onDiscardQueueWithClear(queue, currentContext(), null); + actual.onError(this.error); + } else if (queue.isEmpty()) { + actual.onComplete(); + } + } + + if (consumed > 0) { + int toDecrement = -consumed; + currentState = forceUpdate(this, state -> resetTimeout(incrementIndex(state, toDecrement))); + previousState = resetTimeout(incrementIndex(previousState, toDecrement)); + } + + currentState = tryClearWip(this, previousState); + + // If the state changed (e.g. new item arrived, a request was issued, + // cancellation, error, completion) we will loop again. + if (!hasWorkInProgress(currentState)) { + if (logger != null) { + trace(logger, "drain done"); + } + return; + } + if (logger != null) { + trace(logger, "drain repeat"); } } - // continue to see if there's more - return true; } - return false; + } + + int flush() { + T element; + C buffer; + + element = queue.poll(); + if (element == null) { + // There is demand, but the queue is empty. + return 0; + } + buffer = bufferSupplier.get(); + int i = 0; + do { + buffer.add(element); + } while ((++i < batchSize) && ((element = queue.poll()) != null)); + + actual.onNext(buffer); + + return i; + } + + private void requestMore(int n) { + if (logger != null) { + trace(logger, "requestMore " + n); + } + outstanding += n; + Objects.requireNonNull(this.subscription).request(n); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return this.subscription; - if (key == Attr.CANCELLED) return terminated == TERMINATED_WITH_CANCEL; - if (key == Attr.TERMINATED) return terminated == TERMINATED_WITH_ERROR || terminated == TERMINATED_WITH_SUCCESS; + if (key == Attr.CANCELLED) return isCancelled(this.state); + if (key == Attr.TERMINATED) return isTerminated(this.state); if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; if (key == Attr.CAPACITY) return prefetch; // TODO: revise if (key == Attr.BUFFERED) return queue.size(); @@ -526,6 +534,171 @@ public Object scanUnsafe(Attr key) { return InnerOperator.super.scanUnsafe(key); } + + /* + Below are bit field operations that aid in transitioning the state machine. + An actual update to the state field is achieved with force* method prefix. + The try* prefix indicates that if an update is unsuccessful it will return with + the current state value instead of the intended one. + In these stateful operations we use the StateLogger to indicate transitions + happening. Below are the 3-letter acronyms used: + - faw = forceAddWork + - fup = forceUpdate + - wcl = WIP cleared (meaning work-in-progress and request fields were cleared) + */ + + private static long bitwiseIncrement(long state, long mask, long shift, int amount) { + long shiftAndAdd = ((state & mask) >> shift) + amount; + long shiftBackAndLimit = (shiftAndAdd << shift) & mask; + long clearedState = state & ~mask; + return clearedState | shiftBackAndLimit; + } + + private static boolean isTerminated(long state) { + return (state & TERMINATED_FLAG) == TERMINATED_FLAG; + } + + private static long setTerminated(long state) { + return state | TERMINATED_FLAG; + } + + private static boolean isCancelled(long state) { + return (state & CANCELLED_FLAG) == CANCELLED_FLAG; + } + + private static long setCancelled(long state) { + return state | CANCELLED_FLAG; + } + + private static long incrementRequestIndex(long state) { + return bitwiseIncrement(state, REQUESTED_INDEX_MASK, REQUESTED_INDEX_SHIFT, 1); + } + + private static long getIndex(long state) { + return (state & INDEX_MASK) >> INDEX_SHIFT; + } + + private static long incrementIndex(long state, int amount) { + return bitwiseIncrement(state, INDEX_MASK, INDEX_SHIFT, amount); + } + + private static boolean hasWorkInProgress(long state) { + return (state & HAS_WORK_IN_PROGRESS_FLAG) == HAS_WORK_IN_PROGRESS_FLAG; + } + + private static long setWorkInProgress(long state) { + return state | HAS_WORK_IN_PROGRESS_FLAG; + } + + private static long setTimedOut(long state) { + return state | TIMEOUT_FLAG; + } + + private static long resetTimeout(long state) { + return state & ~TIMEOUT_FLAG; + } + + private static boolean isTimedOut(long state) { + return (state & TIMEOUT_FLAG) == TIMEOUT_FLAG; + } + + /** + * Force a state update and return the state that the update is based upon. + * If the state was replaced but there was work in progress, before leaving the + * protected section the working actor will notice an update and loop again to + * pick up the update (e.g. demand increase). If there was no active actor or + * the active actor was done before our update, the caller of this method is + * obliged to check whether the returned value (the previousState) had WIP flag + * set. In case it was not, it should call the drain procedure. + * + * @param instance target of CAS operations + * @param f transformation to apply to state + * @return state value on which the effective update is based (previousState) + */ + private static long forceAddWork( + BufferTimeoutWithBackpressureSubscriber instance, Function f) { + for (;;) { + long previousState = instance.state; + long nextState = f.apply(previousState) | HAS_WORK_IN_PROGRESS_FLAG; + if (STATE.compareAndSet(instance, previousState, nextState)) { + if (instance.stateLogger != null) { + instance.stateLogger.log(instance.toString(), + "faw", + previousState, + nextState); + } + return previousState; + } + } + } + + /** + * Unconditionally force the state transition and return the new state instead + * of the old one. The caller has no way to know whether the update happened + * while something else had WIP flag set. Therefore, this method can only be + * used in the drain procedure where the caller knows that the WIP flag is set + * and doesn't need to make that inference. + * + * @param instance target of CAS operations + * @param f transformation to apply to state + * @return effective state value (nextState) + */ + private static long forceUpdate( + BufferTimeoutWithBackpressureSubscriber instance, Function f) { + for (;;) { + long previousState = instance.state; + long nextState = f.apply(previousState); + if (STATE.compareAndSet(instance, previousState, nextState)) { + if (instance.stateLogger != null) { + instance.stateLogger.log(instance.toString(), + "fup", + previousState, + nextState); + } + return nextState; + } + } + } + + /** + * Attempt to clear the work-in-progress (WIP) flag. If the current state + * doesn't match the expected state, the current state is returned and the flag + * is not cleared. Otherwise, the effective new state is returned that has: + *
    + *
  • WIP cleared
  • + *
  • Requested index cleared
  • + *
+ * + * @param instance target of CAS operations + * @param expectedState the state reading on which the caller bases the + * intention to remove the WIP flag. In case it's + * currently different, the caller should repeat the + * drain procedure to notice any updates. + * @return current state value (currentState in case of expectations + * mismatch or nextState in case of successful WIP clearing) + */ + private static > long tryClearWip( + BufferTimeoutWithBackpressureSubscriber instance, long expectedState) { + for (;;) { + final long currentState = instance.state; + + if (expectedState != currentState) { + return currentState; + } + + // Remove both WIP and requested_index so that we avoid overflowing + long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG & ~REQUESTED_INDEX_MASK; + if (STATE.compareAndSet(instance, currentState, nextState)) { + if (instance.stateLogger != null) { + instance.stateLogger.log(instance.toString(), + "wcl", + currentState, + nextState); + } + return nextState; + } + } + } } final static class BufferTimeoutSubscriber> diff --git a/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java b/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java index e24a071e45..39e6c0deb6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java +++ b/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,7 +49,7 @@ void log(String instance, formatState(committedState, 64)), new RuntimeException()); } else { - this.logger.trace(String.format("[%s][%s][%s][%s-%s]", + this.logger.trace(String.format("[%s][%s][%s][\n\t%s\n\t%s]", instance, action, Thread.currentThread() diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java index 01b21276a5..20a2237a9a 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -62,6 +63,7 @@ public void tearDown() { } @Test + @Tag("slow") void backpressureSupported() throws InterruptedException { final int eventProducerDelayMillis = 200; // Event Producer emits requested items to downstream with a 200ms delay @@ -365,7 +367,7 @@ public void requestedFromUpstreamShouldNotExceedDownstreamDemand() { .assertNext(s -> assertThat(s).containsExactly("a")) .then(() -> assertThat(requestedOutstanding).hasValue(19)) .thenRequest(1) - .then(() -> assertThat(requestedOutstanding).hasValue(20)) + .then(() -> assertThat(requestedOutstanding).hasValue(19)) .thenCancel() .verify(); } @@ -392,6 +394,7 @@ public void scanSubscriberCancelled() { @Test public void bufferTimeoutShouldNotRaceWithNext() { Set seen = new HashSet<>(); + AtomicBoolean complete = new AtomicBoolean(); Consumer> consumer = integers -> { for (Integer i : integers) { if (!seen.add(i)) { @@ -399,7 +402,8 @@ public void bufferTimeoutShouldNotRaceWithNext() { } } }; - CoreSubscriber> actual = new LambdaSubscriber<>(consumer, null, null, null); + CoreSubscriber> actual = new LambdaSubscriber<>( + consumer, null, () -> complete.set(true), null); FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> test = new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber>( @@ -419,6 +423,7 @@ public void bufferTimeoutShouldNotRaceWithNext() { test.onComplete(); assertThat(seen.size()).isEqualTo(500); + assertThat(complete.get()).isTrue(); } //see https://github.com/reactor/reactor-core/issues/1247 diff --git a/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java b/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java index 22d323bc9f..245c4ddef6 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java @@ -157,8 +157,7 @@ public class OnDiscardShouldNotLeakTest { DiscardScenario.fluxSource("monoFilterWhenFalse", main -> main.last().filterWhen(__ -> Mono.just(false).hide())), DiscardScenario.fluxSource("last", main -> main.last(new Tracked("default")).flatMap(f -> Mono.just(f).hide())), DiscardScenario.fluxSource("flatMapIterable", f -> f.flatMapIterable(Arrays::asList)), - // FIXME: uncomment once https://github.com/reactor/reactor-core/issues/3531 is resolved -// DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofMillis(1), true).flatMapIterable(Function.identity())), + DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofMillis(1), true).flatMapIterable(Function.identity())), DiscardScenario.fluxSource("publishOnDelayErrors", f -> f.publishOn(Schedulers.immediate())), DiscardScenario.fluxSource("publishOnImmediateErrors", f -> f.publishOn(Schedulers.immediate(), false, Queues.SMALL_BUFFER_SIZE)), DiscardScenario.fluxSource("publishOnAndPublishOn", main -> main