From c3125c49851d23f26397fb2d5185a7e6d84fe50b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 7 Nov 2023 13:53:35 +0100 Subject: [PATCH 01/10] Handling late arriving drain operations in bufferTimeout --- .../core/publisher/FluxBufferTimeout.java | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index dcc6f7228f..e9603db174 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -419,38 +419,27 @@ private void drain() { } // make another spin if there's more work } else { - if (completed) { - // if queue is empty, the discard is ignored + if (completed || terminated == TERMINATED_WITH_CANCEL) { if (logger != null) { trace(logger, "Discarding entire queue of " + queue.size()); } - Operators.onDiscardQueueWithClear(queue, currentContext(), - null); - return; - } - // TODO: potentially the below can be executed twice? - if (terminated == TERMINATED_WITH_CANCEL) { - if (logger != null) { - trace(logger, "Discarding entire queue of " + queue.size()); - } - Operators.onDiscardQueueWithClear(queue, currentContext(), - null); - return; - } - while (flushABuffer()) { - // no-op - } - if (queue.isEmpty()) { - completed = true; - if (this.error != null) { - actual.onError(this.error); - } - else { - actual.onComplete(); - } + Operators.onDiscardQueueWithClear(queue, currentContext(), null); } else { - if (logger != null) { - trace(logger, "Queue not empty after termination"); + while (flushABuffer()) { + // no-op + } + if (queue.isEmpty()) { + completed = true; + if (this.error != null) { + actual.onError(this.error); + } + else { + actual.onComplete(); + } + } else { + if (logger != null) { + trace(logger, "Queue not empty after termination"); + } } } } From 033495922f1a92396def5ea6a240afd2035ba756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 22 Nov 2023 13:59:03 +0100 Subject: [PATCH 02/10] Improved tests to cover discarded values --- .../FluxBufferTimeoutStressTest.java | 195 +++++++++++++----- .../core/publisher/FluxBufferTimeout.java | 27 +++ 2 files changed, 172 insertions(+), 50 deletions(-) diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java index d19e1122e4..d85bcb5461 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java @@ -18,10 +18,12 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.openjdk.jcstress.annotations.Actor; import org.openjdk.jcstress.annotations.Arbiter; @@ -30,23 +32,28 @@ import org.openjdk.jcstress.annotations.Outcome; import org.openjdk.jcstress.annotations.State; import org.openjdk.jcstress.infra.results.LLL_Result; +import org.openjdk.jcstress.infra.results.LL_Result; import reactor.core.util.FastLogger; import reactor.test.scheduler.VirtualTimeScheduler; +import static java.util.Collections.emptyList; + public class FluxBufferTimeoutStressTest { @JCStressTest - @Outcome(id = "1, 1, 1", expect = Expect.ACCEPTABLE, desc = "") - @Outcome(id = "2, 1, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "1, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "2, 1", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndTimeout { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = - new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null); + new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); final StressSubscription subscription = new StressSubscription<>(bufferTimeoutSubscriber); @@ -67,35 +74,39 @@ public void timeout() { } @Arbiter - public void arbiter(LLL_Result result) { + public void arbiter(LL_Result result) { result.r1 = subscriber.onNextCalls.get(); - result.r2 = subscriber.onCompleteCalls.get(); - result.r3 = subscription.requestsCount.get(); + result.r2 = subscription.requestsCount.get(); - if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + if (subscriber.onCompleteCalls.get() != 1) { + fail(fastLogger, + "unexpected completion count " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); + } + if (!subscriber.discardedValues.isEmpty()) { + fail(fastLogger, "Unexpected discarded values " + subscriber.discardedValues); } } } @JCStressTest - @Outcome(id = "3, 1, 1", expect = Expect.ACCEPTABLE, desc = "") - @Outcome(id = "4, 1, 1", expect = Expect.ACCEPTABLE, desc = "") - @Outcome(id = "5, 1, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "3, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "4, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "5, 1", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeouts { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); - final FastLogger fastLogger = new FastLogger(getClass().getName()); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); @@ -126,22 +137,25 @@ public void timeout() { } @Arbiter - public void arbiter(LLL_Result result) { + public void arbiter(LL_Result result) { result.r1 = subscriber.onNextCalls.get(); - result.r2 = subscriber.onCompleteCalls.get(); - result.r3 = subscription.requestsCount.get(); + result.r2 = subscription.requestsCount.get(); if (subscriber.onCompleteCalls.get() != 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion: " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); } - if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) { - throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + if (!subscriber.discardedValues.isEmpty()) { + fail(fastLogger, "Unexpected discarded values " + subscriber.discardedValues); + } + if (!allValuesHandled(fastLogger, 5, emptyList(), + subscriber.receivedValues)) { + fail(fastLogger, "not all values delivered; result=" + result); } } } @@ -158,16 +172,19 @@ public void arbiter(LLL_Result result) { @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeoutsPossiblyIncomplete { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(1); - final FastLogger fastLogger = new FastLogger(getClass().getName()); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); Sinks.Many proxy = Sinks.unsafe().many().unicast().onBackpressureBuffer(); + final AtomicLong requested = new AtomicLong(); + { proxy.asFlux() .doOnRequest(r -> requested.incrementAndGet()) @@ -213,29 +230,34 @@ public void arbiter(LLL_Result result) { result.r2 = subscriber.onCompleteCalls.get(); result.r3 = requested.get(); + if (!allValuesHandled(fastLogger, 4, emptyList(), subscriber.receivedValues)) { + fail(fastLogger, "minimum set of values not delivered"); + } + if (subscriber.onCompleteCalls.get() == 0) { - if (subscriber.receivedValues.stream() - .noneMatch(buf -> buf.size() == 1)) { - throw new IllegalStateException("incomplete but received all two " + - "element buffers. received: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + if (subscriber.receivedValues.size() == 5 && + subscriber.receivedValues.stream().noneMatch(buf -> buf.size() == 1)) { + fail(fastLogger, "incomplete but delivered all two " + + "element buffers. received: " + subscriber.receivedValues + "; result=" + result); } } + // TODO #onNext < 5 and incomplete => why fail? if (subscriber.onNextCalls.get() < 5 && subscriber.onCompleteCalls.get() == 0) { - throw new IllegalStateException("incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result + "\n" + fastLogger); + fail(fastLogger, "incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result); } if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); } if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) { - throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + fail(fastLogger, "received an empty buffer: " + subscriber.receivedValues + "; result=" + result); } } } @@ -247,12 +269,14 @@ public void arbiter(LLL_Result result) { @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancel { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = - new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null); + new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); final StressSubscription subscription = new StressSubscription<>(bufferTimeoutSubscriber); @@ -279,13 +303,16 @@ public void arbiter(LLL_Result result) { result.r3 = subscription.requestsCount.get(); if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); + } + if (!allValuesHandled(fastLogger, 2, subscriber.discardedValues, subscriber.receivedValues)) { + fail(fastLogger, "Not all handled!" + "; result=" + result); } } } @@ -310,6 +337,7 @@ public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelWithBackpres new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); Sinks.Many proxy = Sinks.unsafe().many().unicast().onBackpressureBuffer(); + AtomicLong emits = new AtomicLong(); final AtomicLong requested = new AtomicLong(); { proxy.asFlux() @@ -319,11 +347,12 @@ public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelWithBackpres @Actor public void next() { - proxy.tryEmitNext(0L); - proxy.tryEmitNext(1L); - - proxy.tryEmitNext(2L); - proxy.tryEmitNext(3L); + for (long i = 0; i < 4; i++) { + if (proxy.tryEmitNext(i) != Sinks.EmitResult.OK) { + return; + } + emits.set(i + 1); + } proxy.tryEmitComplete(); } @@ -344,16 +373,26 @@ public void arbiter(LLL_Result result) { result.r3 = requested.get(); if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); } - if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) { - throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger); + int emits = (int) this.emits.get(); + if (subscriber.onCompleteCalls.get() == 1 && !allValuesHandled(fastLogger, 4, + emptyList(), + subscriber.receivedValues)) { + fail(fastLogger, + "Completed but not all values handled!" + "; result=" + result); + } + + if (subscriber.onNextCalls.get() > 0 && !allValuesHandled(fastLogger, emits, + subscriber.discardedValues, + subscriber.receivedValues)) { + fail(fastLogger, "Not all " + emits + " emits handled!" + "; result=" + result); } } } @@ -367,12 +406,14 @@ public void arbiter(LLL_Result result) { @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelAndTimeout { + final FastLogger fastLogger = new FastLogger(getClass().getName()); + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); final StressSubscriber> subscriber = new StressSubscriber<>(); final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> bufferTimeoutSubscriber = - new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null); + new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger); final StressSubscription subscription = new StressSubscription<>(bufferTimeoutSubscriber); @@ -404,17 +445,71 @@ public void arbiter(LLL_Result result) { result.r3 = subscription.requestsCount.get(); if (subscriber.onCompleteCalls.get() > 1) { - throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get()); + fail(fastLogger, + "unexpected completion " + subscriber.onCompleteCalls.get()); } if (subscriber.concurrentOnComplete.get()) { - throw new IllegalStateException("subscriber concurrent onComplete"); + fail(fastLogger, "subscriber concurrent onComplete"); } if (subscriber.concurrentOnNext.get()) { - throw new IllegalStateException("subscriber concurrent onNext"); + fail(fastLogger, "subscriber concurrent onNext"); + } + + if (!allValuesHandled(fastLogger, 2, subscriber.discardedValues, subscriber.receivedValues)) { + fail(fastLogger, "Not all handled!" + "; result=" + result); } } } + private static void fail(FastLogger fastLogger, String msg) { + throw new IllegalStateException(msg + "\n" + fastLogger); + } + + private static boolean allValuesHandled(FastLogger logger, int range, List discarded, List>... delivered) { + if (delivered.length == 0) { + return false; + } + + List discardedValues = discarded.stream() + .map(o -> (Long) o) + .collect(Collectors.toList()); + + logger.trace("discarded: " + discardedValues); + logger.trace("delivered: " + Arrays.toString(delivered)); + + boolean[] search = new boolean[range]; + for (long l : discardedValues) { + search[(int) l] = true; + } + + List> all = + Arrays.stream(delivered) + .flatMap(lists -> lists.stream()) + .collect(Collectors.toList()); + + for (List buf : all) { + if (buf.isEmpty()) { + fail(logger, "Received empty buffer!"); + } + for (long l : buf) { + if (l >= range) { + // just check within the range + continue; + } + if (search[(int) l]) { + fail(logger, "Duplicate value (both discarded " + + "and delivered, or duplicated in multiple buffers)"); + } + search[(int) l] = true; + } + } + for (boolean b : search) { + if (!b) { + return false; + } + } + return true; + } private static Supplier> bufferSupplier() { return ArrayList::new; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index e9603db174..e3cd882e10 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -47,6 +47,7 @@ final class FluxBufferTimeout> extends Intern final long timespan; final TimeUnit unit; final boolean fairBackpressure; + final Logger logger; FluxBufferTimeout(Flux source, int maxSize, @@ -68,6 +69,32 @@ final class FluxBufferTimeout> extends Intern this.batchSize = maxSize; this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier"); this.fairBackpressure = fairBackpressure; + this.logger = null; + } + + // for testing + FluxBufferTimeout(Flux source, + int maxSize, + long timespan, + TimeUnit unit, + Scheduler timer, + Supplier bufferSupplier, + boolean fairBackpressure, + Logger logger) { + super(source); + if (timespan <= 0) { + throw new IllegalArgumentException("Timeout period must be strictly positive"); + } + if (maxSize <= 0) { + throw new IllegalArgumentException("maxSize must be strictly positive"); + } + this.timer = Objects.requireNonNull(timer, "Timer"); + this.timespan = timespan; + this.unit = Objects.requireNonNull(unit, "unit"); + this.batchSize = maxSize; + this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier"); + this.fairBackpressure = fairBackpressure; + this.logger = logger; } @Override From 72d442b52985bf7fc2cf36eacfe98e4fd6de0758 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 22 Nov 2023 14:09:26 +0100 Subject: [PATCH 03/10] Temporary improvements to address most identified race conditions --- .../core/publisher/FluxBufferTimeout.java | 70 ++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index e3cd882e10..cf6845c6e7 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -237,6 +237,10 @@ public void onNext(T t) { t, actual.currentContext()); this.error = error; if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { + if (logger != null) { + trace(logger, "Failed to transition to error. Discarding " + t); + } + Operators.onDiscard(t, ctx); Operators.onErrorDropped(error, ctx); return; } @@ -265,14 +269,13 @@ public void onNext(T t) { Throwable error = Operators.onRejectedExecution(ree, subscription, null, t, ctx); this.error = error; if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - Operators.onDiscard(t, ctx); + Operators.onDiscardQueueWithClear(queue, currentContext(), null); Operators.onErrorDropped(error, ctx); return; } if (logger != null) { - trace(logger, "Discarding upon timer rejection" + t); + trace(logger, "Draining upon timer rejection" + t); } - Operators.onDiscard(t, ctx); drain(); return; } @@ -295,8 +298,8 @@ public void onNext(T t) { currentTimeoutTask.dispose(); } this.index = 0; - drain(); } + drain(shouldDrain); } else { if (logger != null) { trace(logger, "Discarding onNext: " + t); @@ -395,10 +398,9 @@ public void cancel() { if (logger != null) { trace(logger, "cancel"); } - if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_CANCEL)) { - if (this.subscription != null) { - this.subscription.cancel(); - } + TERMINATED.set(this, TERMINATED_WITH_CANCEL); + if (this.subscription != null) { + this.subscription.cancel(); } if (currentTimeoutTask != null) { currentTimeoutTask.dispose(); @@ -425,6 +427,9 @@ void bufferTimedOut() { } private void drain() { + drain(true); + } + private void drain(boolean flush) { // entering this should be guarded by WIP getAndIncrement == 0 // if we're here it means do a flush if there is downstream demand // regardless of queue size @@ -439,39 +444,38 @@ private void drain() { if (logger != null) { trace(logger, "drain. wip: " + wip); } - if (terminated == NOT_TERMINATED) { - // is there demand? - while (flushABuffer()) { - // no-op + boolean done = terminated != NOT_TERMINATED; + boolean cancelled = terminated == TERMINATED_WITH_CANCEL; + if (completed || cancelled) { + if (logger != null) { + trace(logger, "Discarding entire queue of " + queue.size()); } - // make another spin if there's more work + Operators.onDiscardQueueWithClear(queue, currentContext(), null); +// return; } else { - if (completed || terminated == TERMINATED_WITH_CANCEL) { - if (logger != null) { - trace(logger, "Discarding entire queue of " + queue.size()); - } - Operators.onDiscardQueueWithClear(queue, currentContext(), null); - } else { + if (flush) { while (flushABuffer()) { // no-op } - if (queue.isEmpty()) { - completed = true; - if (this.error != null) { - actual.onError(this.error); - } - else { - actual.onComplete(); - } - } else { - if (logger != null) { - trace(logger, "Queue not empty after termination"); - } + } + } + wip = this.wip; + if (wip == 1) { + if (done && !cancelled && !completed && queue.isEmpty()) { + if (logger != null) { + trace(logger, "Completed with " + terminated); + } + completed = true; + if (this.error != null) { + actual.onError(this.error); + } + else { + actual.onComplete(); } } } - if (WIP.compareAndSet(this, wip, 0)) { - break; + if (WIP.decrementAndGet(this) == 0) { + return; } } } From 16d2d1f56e26c5e94c6aec043f5d003123d3f07d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Mon, 29 Apr 2024 18:19:37 +0200 Subject: [PATCH 04/10] Re-work bufferTimeout with fair backpressure to consistent state machine --- .../FluxBufferTimeoutStressTest.java | 9 +- .../core/publisher/FluxBufferTimeout.java | 724 +++++++++++------- .../reactor/core/publisher/StateLogger.java | 4 +- ...FluxBufferTimeoutFairBackpressureTest.java | 11 +- .../publisher/OnDiscardShouldNotLeakTest.java | 3 +- 5 files changed, 445 insertions(+), 306 deletions(-) diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java index d85bcb5461..adf6e2ccb6 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxBufferTimeoutStressTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2023-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -98,6 +98,9 @@ public void arbiter(LL_Result result) { @Outcome(id = "3, 1", expect = Expect.ACCEPTABLE, desc = "") @Outcome(id = "4, 1", expect = Expect.ACCEPTABLE, desc = "") @Outcome(id = "5, 1", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "3, 2", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "4, 2", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "5, 2", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeouts { @@ -169,6 +172,7 @@ public void arbiter(LL_Result result) { @Outcome(id = "5, 0, 3", expect = Expect.ACCEPTABLE, desc = "") @Outcome(id = "5, 0, 4", expect = Expect.ACCEPTABLE, desc = "") @Outcome(id = "5, 0, 5", expect = Expect.ACCEPTABLE, desc = "") + @Outcome(id = "5, 0, 1", expect = Expect.ACCEPTABLE, desc = "") @State public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeoutsPossiblyIncomplete { @@ -242,8 +246,7 @@ public void arbiter(LLL_Result result) { } } - // TODO #onNext < 5 and incomplete => why fail? - if (subscriber.onNextCalls.get() < 5 && subscriber.onCompleteCalls.get() == 0) { + if (subscriber.onNextCalls.get() < 5) { fail(fastLogger, "incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index cf6845c6e7..a0c6648832 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +23,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.scheduler.Scheduler; import reactor.util.Logger; @@ -129,15 +131,22 @@ public Object scanUnsafe(Attr key) { final static class BufferTimeoutWithBackpressureSubscriber> implements InnerOperator { - @Nullable - final Logger logger; - final CoreSubscriber actual; - final int batchSize; - final int prefetch; - final long timeSpan; - final TimeUnit unit; - final Scheduler.Worker timer; - final Supplier bufferSupplier; + final @Nullable Logger logger; + final @Nullable StateLogger stateLogger; + final CoreSubscriber actual; + final int batchSize; + final int prefetch; + final long timeSpan; + final TimeUnit unit; + final Scheduler.Worker timer; + final Supplier bufferSupplier; + private final Disposable.Swap currentTimeoutTask = Disposables.swap(); + + private @Nullable Subscription subscription; + private Queue queue; + + private @Nullable Throwable error; + private boolean done; // tracks unsatisfied downstream demand (expressed in # of buffers) volatile long requested; @@ -145,46 +154,30 @@ final static class BufferTimeoutWithBackpressureSubscriber REQUESTED = AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "requested"); - // tracks undelivered values in the current buffer - volatile int index; - @SuppressWarnings("rawtypes") - private AtomicIntegerFieldUpdater INDEX = - AtomicIntegerFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "index"); - - // tracks # of values requested from upstream but not delivered yet via this - // .onNext(v) - volatile long outstanding; + volatile long state; @SuppressWarnings("rawtypes") - private AtomicLongFieldUpdater OUTSTANDING = - AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "outstanding"); - - // indicates some thread is draining - volatile int wip; - @SuppressWarnings("rawtypes") - private AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "wip"); - - private volatile int terminated = NOT_TERMINATED; - @SuppressWarnings("rawtypes") - private AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "terminated"); - - final static int NOT_TERMINATED = 0; - final static int TERMINATED_WITH_SUCCESS = 1; - final static int TERMINATED_WITH_ERROR = 2; - final static int TERMINATED_WITH_CANCEL = 3; - - @Nullable - private Subscription subscription; - - private Queue queue; - - @Nullable - Throwable error; - - boolean completed; - - Disposable currentTimeoutTask; + static final AtomicLongFieldUpdater STATE = + AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "state"); + + static final long CANCELLED_FLAG = + 0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long TERMINATED_FLAG = + 0b0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long HAS_WORK_IN_PROGRESS_FLAG = + 0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long TIMEOUT_FLAG = + 0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long REQUESTED_INDEX_MASK = + 0b0000_1111_1111_1111_1111_1111_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + static final long OUTSTANDING_MASK = + 0b0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_0000_0000_0000_0000_0000L; + static final long INDEX_MASK = + 0b0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111L; + + static final int INDEX_SHIFT = 0; + static final int OUTSTANDING_SHIFT = 20; + static final int REQUESTED_INDEX_SHIFT = 40; + static final int INDEX_LIMIT = 1 << OUTSTANDING_SHIFT; // 1048576; // 2^20 public BufferTimeoutWithBackpressureSubscriber( CoreSubscriber actual, @@ -195,12 +188,18 @@ public BufferTimeoutWithBackpressureSubscriber( Supplier bufferSupplier, @Nullable Logger logger) { this.actual = actual; + // TODO: reconsider OUTSTANDING to be taken out of the mask to allow for higher value + // -> this translates to 4MiB of ints + if (batchSize >= INDEX_LIMIT) { + throw new IllegalArgumentException("Batch size can't exceed " + INDEX_LIMIT + " items"); + } this.batchSize = batchSize; this.timeSpan = timeSpan; this.unit = unit; this.timer = timer; this.bufferSupplier = bufferSupplier; this.logger = logger; + this.stateLogger = logger != null ? new StateLogger(logger) : null; this.prefetch = batchSize << 2; this.queue = Queues.get(prefetch).get(); } @@ -218,326 +217,333 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(T t) { + public CoreSubscriber actual() { + return this.actual; + } + + @Override + public void request(long n) { if (logger != null) { - trace(logger, "onNext: " + t); - } - // check if terminated (cancelled / error / completed) -> discard value if so - - // increment index - // append to buffer - // drain - - if (terminated == NOT_TERMINATED) { - // assume no more deliveries than requested - if (!queue.offer(t)) { - Context ctx = currentContext(); - Throwable error = Operators.onOperatorError(this.subscription, - Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), - t, actual.currentContext()); - this.error = error; - if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - if (logger != null) { - trace(logger, "Failed to transition to error. Discarding " + t); - } - Operators.onDiscard(t, ctx); - Operators.onErrorDropped(error, ctx); - return; - } - Operators.onDiscard(t, ctx); - drain(); + trace(logger, "request " + n); + } + if (Operators.validate(n)) { + long previouslyRequested = Operators.addCap(REQUESTED, this, n); + if (previouslyRequested == Long.MAX_VALUE) { return; } - boolean shouldDrain = false; + long previousState; for (;;) { - int index = this.index; - if (INDEX.compareAndSet(this, index, index + 1)) { - if (index == 0) { - try { - if (logger != null) { - trace(logger, "timerStart"); - } - currentTimeoutTask = timer.schedule(this::bufferTimedOut, - timeSpan, - unit); - } catch (RejectedExecutionException ree) { - if (logger != null) { - trace(logger, "Timer rejected for " + t); - } - Context ctx = actual.currentContext(); - Throwable error = Operators.onRejectedExecution(ree, subscription, null, t, ctx); - this.error = error; - if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - Operators.onDiscardQueueWithClear(queue, currentContext(), null); - Operators.onErrorDropped(error, ctx); - return; - } - if (logger != null) { - trace(logger, "Draining upon timer rejection" + t); - } - drain(); - return; + previousState = this.state; + + if (queue.isEmpty() && (isTerminated(previousState) || isCancelled(previousState))) { + return; + } + + if (hasWorkInProgress(previousState)) { + // We let the active worker know about a change in demand. + long nextState = incrementRequestIndex(previousState); + if (STATE.compareAndSet(this, previousState, nextState)) { + if (this.stateLogger != null) { + this.stateLogger.log(this.toString(), "req", previousState, nextState); } + // If the state was replaced but there was work in progress, + // before leaving the protected section the working actor + // will notice an update and loop again to pick up the + // demand increase. If the active actor was done before our + // update, this update would have failed and we'd loop + // again to see the current state. + return; } - if ((index + 1) % batchSize == 0) { - shouldDrain = true; + // CAS failed, retry + continue; + } + + long nextState = previousState | HAS_WORK_IN_PROGRESS_FLAG; + + if (STATE.compareAndSet(this, previousState, nextState)) { + if (this.stateLogger != null) { + this.stateLogger.log(this.toString(), "req", previousState, nextState); } break; } } - if (shouldDrain) { - if (currentTimeoutTask != null) { - // TODO: it can happen that AFTER I dispose, the timeout - // anyway kicks during/after another onNext(), the buffer is - // delivered, and THEN drain is entered -> - // it would emit a buffer that is too small potentially. - // ALSO: - // It is also possible that here we deliver the buffer, but the - // timeout is happening for a new buffer! - currentTimeoutTask.dispose(); - } - this.index = 0; - } - drain(shouldDrain); - } else { - if (logger != null) { - trace(logger, "Discarding onNext: " + t); - } - Operators.onDiscard(t, currentContext()); + + // if there was no demand before - try to fulfill the demand if there + // are buffered values + drain(previouslyRequested == 0); } } @Override - public void onError(Throwable t) { - // set error flag - // set terminated as error - - // drain (WIP++ ?) - - if (currentTimeoutTask != null) { - currentTimeoutTask.dispose(); + public void onNext(T t) { + if (logger != null) { + trace(logger, "onNext " + t); } - timer.dispose(); - - if (!TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_ERROR)) { - Operators.onErrorDropped(t, currentContext()); + if (this.done) { + Operators.onNextDropped(t, this.actual.currentContext()); return; } - this.error = t; // wip in drain will publish the error - drain(); - } - @Override - public void onComplete() { - // set terminated as completed - // drain - if (currentTimeoutTask != null) { - currentTimeoutTask.dispose(); + boolean enqueued = queue.offer(t); + if (!enqueued) { + this.error = Operators.onOperatorError( + this.subscription, + Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), + t, + this.actual.currentContext()); + Operators.onDiscard(t, this.actual.currentContext()); + } + long previousState = this.state; + + if (enqueued) { + // Only onNext increments the index. Drain can set it to 0 when it + // flushes. However, timeout does not reset it to 0, it has its own + // flag. + boolean terminated = false; + if (getIndex(previousState) == 0) { + // fire timer, new buffer starts + try { + Disposable disposable = + timer.schedule(this::bufferTimedOut, timeSpan, unit); + currentTimeoutTask.update(disposable); + } catch (RejectedExecutionException e) { + this.error = Operators.onRejectedExecution(e, subscription, null, t, actual.currentContext()); + terminated = true; + } + } + if (terminated) { + previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + } else { + previousState = forceAddWork(this, state -> incrementIndex(state, 1)); + } + } else { + previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); } - timer.dispose(); - if (TERMINATED.compareAndSet(this, NOT_TERMINATED, TERMINATED_WITH_SUCCESS)) { - drain(); + if (!hasWorkInProgress(previousState)) { + drain(false); } } - @Override - public CoreSubscriber actual() { - return this.actual; - } + void bufferTimedOut() { + if (logger != null) { + trace(logger, "timedOut"); + } + if (this.done) { + return; + } - @Override - public void request(long n) { - // add cap to currently requested - // if previous requested was 0 -> drain first to deliver outdated values - // if the cap increased, request more ? + long previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTimedOut); - // drain + if (!hasWorkInProgress(previousState)) { + drain(false); + } + } - if (Operators.validate(n)) { - if (queue.isEmpty() && terminated != NOT_TERMINATED) { - return; - } + @Override + public void onError(Throwable t) { + if (logger != null) { + trace(logger, "onError " + t); + } + if (this.done) { + Operators.onErrorDropped(t, actual.currentContext()); + return; + } - if (Operators.addCap(REQUESTED, this, n) == 0) { - // there was no demand before - try to fulfill the demand if there - // are buffered values - drain(); - } + this.error = t; + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); - if (batchSize == Integer.MAX_VALUE || n == Long.MAX_VALUE) { - requestMore(Long.MAX_VALUE); - } else { - long requestLimit = prefetch; - if (requestLimit > outstanding) { - if (logger != null) { - trace(logger, "requestMore: " + (requestLimit - outstanding) + ", outstanding: " + outstanding); - } - requestMore(requestLimit - outstanding); - } - } + if (!hasWorkInProgress(previousState)) { + drain(false); } } - private void requestMore(long n) { - Subscription s = this.subscription; - if (s != null) { - Operators.addCap(OUTSTANDING, this, n); - s.request(n); + @Override + public void onComplete() { + if (logger != null) { + trace(logger, "onComplete"); + } + if (this.done) { + return; + } + + long previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + + if (!hasWorkInProgress(previousState)) { + drain(false); } } @Override public void cancel() { - // set terminated flag - // cancel upstream subscription - // dispose timer - // drain for proper cleanup - if (logger != null) { trace(logger, "cancel"); } - TERMINATED.set(this, TERMINATED_WITH_CANCEL); - if (this.subscription != null) { - this.subscription.cancel(); - } - if (currentTimeoutTask != null) { - currentTimeoutTask.dispose(); + if (this.done || isCancelled(this.state)) { + return; } - timer.dispose(); - drain(); - } - void bufferTimedOut() { - // called when buffer times out + if (this.subscription != null) { + subscription.cancel(); + } - // reset index to 0 - // drain + long previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setCancelled); - // TODO: try comparing against current reference and see if it was not - // cancelled -> to do this, replace Disposable timeoutTask with volatile - // and use CAS. - if (logger != null) { - trace(logger, "timerFire"); + if (!hasWorkInProgress(previousState)) { + drain(false); } - this.index = 0; // if currently being drained, it means the buffer is - // delivered due to reaching the batchSize - drain(); } - private void drain() { - drain(true); - } - private void drain(boolean flush) { - // entering this should be guarded by WIP getAndIncrement == 0 - // if we're here it means do a flush if there is downstream demand - // regardless of queue size - - // loop: - // if terminated -> check error -> deliver; else complete downstream - // if cancelled + // Draining doesn't ensure exclusive access - the caller is responsible for + // ensuring that. + private void drain(boolean resumeDemand) { + for (;;) { + if (logger != null) { + trace(logger, "drain start"); + } + long previousState = this.state; + long currentState = previousState; - if (WIP.getAndIncrement(this) == 0) { - for (;;) { - int wip = this.wip; + if (done || isCancelled(currentState)) { if (logger != null) { - trace(logger, "drain. wip: " + wip); + trace(logger, "Discarding entire queue of " + queue.size()); } - boolean done = terminated != NOT_TERMINATED; - boolean cancelled = terminated == TERMINATED_WITH_CANCEL; - if (completed || cancelled) { - if (logger != null) { - trace(logger, "Discarding entire queue of " + queue.size()); - } - Operators.onDiscardQueueWithClear(queue, currentContext(), null); -// return; - } else { - if (flush) { - while (flushABuffer()) { - // no-op - } - } + Operators.onDiscardQueueWithClear(queue, currentContext(), null); + currentState = tryClearWip(this, currentState); + if (!hasWorkInProgress(currentState)) { + return; } - wip = this.wip; - if (wip == 1) { - if (done && !cancelled && !completed && queue.isEmpty()) { + } else { + long index = getIndex(currentState); + long currentRequest = this.requested; + boolean shouldFlush = currentRequest > 0 + && (resumeDemand || isTimedOut(currentState) || isTerminated(currentState) || index >= batchSize); + + int consumed = 0; + if (logger != null) { + trace(logger, "should flush: " + shouldFlush + " currentRequest: " + currentRequest + " index: " + index + " isTerminated: " + isTerminated(currentState) + " isTimedOut: " + isTimedOut(currentState)); + } + if (shouldFlush) { + currentTimeoutTask.update(null); + for (; ; ) { + int consumedNow = flush(); if (logger != null) { - trace(logger, "Completed with " + terminated); + trace(logger, "flushed: " + consumedNow); + } + // Need to make sure that if work is added we clear the + // resumeDemand with which we entered the drain loop as the + // state is now different + resumeDemand = false; + if (consumedNow == 0) { + break; } - completed = true; - if (this.error != null) { - actual.onError(this.error); + consumed += consumedNow; + if (currentRequest != Long.MAX_VALUE) { + currentRequest = REQUESTED.decrementAndGet(this); } - else { - actual.onComplete(); + if (currentRequest == 0) { + break; } } - } - if (WIP.decrementAndGet(this) == 0) { - return; - } - } - } - } - - boolean flushABuffer() { - long requested = this.requested; - if (requested != 0) { - T element; - C buffer; - - element = queue.poll(); - if (element == null) { - // there is demand, but queue is empty - return false; - } - buffer = bufferSupplier.get(); - int i = 0; - do { - buffer.add(element); - } while ((++i < batchSize) && ((element = queue.poll()) != null)); - - if (requested != Long.MAX_VALUE) { - requested = REQUESTED.decrementAndGet(this); - } - if (logger != null) { - trace(logger, "flush: " + buffer + ", now requested: " + requested); - } + } - actual.onNext(buffer); + boolean terminated = isTerminated(currentState); - if (requested != Long.MAX_VALUE) { - if (logger != null) { - trace(logger, "outstanding(" + outstanding + ") -= " + i); + if (consumed > 0) { +// currentState = addOutstanding(this, -consumed); + long decrement = -consumed; + currentState = forceUpdate(this, state -> addOutstanding(state, decrement)); + previousState = addOutstanding(previousState, decrement); } - long remaining = OUTSTANDING.addAndGet(this, -i); - if (terminated == NOT_TERMINATED) { + if (!terminated && currentRequest > 0) { + // request more from upstream + int remaining = getOutstanding(currentState); int replenishMark = prefetch >> 1; // TODO: create field limit instead if (remaining < replenishMark) { - if (logger != null) { - trace(logger, "replenish: " + (prefetch - remaining) + ", outstanding: " + outstanding); - } - requestMore(prefetch - remaining); + currentState = requestMore(prefetch - remaining); + previousState = addOutstanding(previousState, prefetch - remaining); } } - if (requested <= 0) { - return false; + if (terminated && queue.isEmpty()) { + // TODO: make sure we don't do this unless we + // know all the values were delivered first + done = true; + if (logger != null) { + trace(logger, "terminated! error: " + this.error + " queue size: " + queue.size()); + } + if (this.error != null) { + Operators.onDiscardQueueWithClear(queue, currentContext(), null); + actual.onError(this.error); + } else if (queue.isEmpty()) { + actual.onComplete(); + } + } + + if (consumed > 0) { + int toDecrement = -consumed; + currentState = forceUpdate(this, state -> resetTimeout(incrementIndex(state, toDecrement))); + previousState = resetTimeout(incrementIndex(previousState, toDecrement)); + } + + // TODO: If we force an update before we lose the knowledge that + // some other actor modified something in between, so we must make + // sure to track these updates in both current and previous and + // compare. + currentState = tryClearWip(this, previousState); + + // If the state changed (e.g. new item arrived, a request was issued, + // cancellation, error, completion) we will loop again. + if (!hasWorkInProgress(currentState)) { + if (logger != null) { + trace(logger, "drain done"); + } + return; + } + if (logger != null) { + trace(logger, "drain repeat"); } } - // continue to see if there's more - return true; } - return false; + } + + int flush() { + T element; + C buffer; + + element = queue.poll(); + if (element == null) { + // there is demand, but queue is empty + return 0; + } + buffer = bufferSupplier.get(); + int i = 0; + do { + buffer.add(element); + } while ((++i < batchSize) && ((element = queue.poll()) != null)); + + actual.onNext(buffer); + + return i; + } + + private long requestMore(int n) { + if (logger != null) { + trace(logger, "requestMore " + n); + } + long currentState = forceUpdate(this, state -> addOutstanding(state, n)); + Objects.requireNonNull(this.subscription).request(n); + return currentState; } @Override public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return this.subscription; - if (key == Attr.CANCELLED) return terminated == TERMINATED_WITH_CANCEL; - if (key == Attr.TERMINATED) return terminated == TERMINATED_WITH_ERROR || terminated == TERMINATED_WITH_SUCCESS; + if (key == Attr.CANCELLED) return isCancelled(this.state); + if (key == Attr.TERMINATED) return isTerminated(this.state); if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; if (key == Attr.CAPACITY) return prefetch; // TODO: revise if (key == Attr.BUFFERED) return queue.size(); @@ -546,6 +552,132 @@ public Object scanUnsafe(Attr key) { return InnerOperator.super.scanUnsafe(key); } + + private static long bitwiseInc(long state, long mask, long shift, int amount) { + long shiftAndAdd = ((state & mask) >> shift) + amount; + long shiftBackAndLimit = (shiftAndAdd << shift) & mask; + long clearedState = state & ~mask; + return clearedState | shiftBackAndLimit; + } + + private static boolean isTerminated(long state) { + return (state & TERMINATED_FLAG) == TERMINATED_FLAG; + } + + private static long setTerminated(long state) { + return state | TERMINATED_FLAG; + } + + private static boolean isCancelled(long state) { + return (state & CANCELLED_FLAG) == CANCELLED_FLAG; + } + + private static long setCancelled(long state) { + return state | CANCELLED_FLAG; + } + + private static long incrementRequestIndex(long state) { + return bitwiseInc(state, REQUESTED_INDEX_MASK, REQUESTED_INDEX_SHIFT, 1); + } + + private static long getIndex(long state) { + long l = (state & INDEX_MASK) >> INDEX_SHIFT; + return l; + } + + private static long incrementIndex(long state, int amount) { + return bitwiseInc(state, INDEX_MASK, INDEX_SHIFT, amount); + } + + private static boolean hasWorkInProgress(long state) { + return (state & HAS_WORK_IN_PROGRESS_FLAG) == HAS_WORK_IN_PROGRESS_FLAG; + } + + private static long setWorkInProgress(long state) { + return state | HAS_WORK_IN_PROGRESS_FLAG; + } + + private static long setTimedOut(long state) { + return state | TIMEOUT_FLAG; + } + + private static long resetTimeout(long state) { + return state & ~TIMEOUT_FLAG; + } + + private static long resetIndex(long state) { + return state & ~INDEX_MASK; + } + + private static boolean isTimedOut(long state) { + return (state & TIMEOUT_FLAG) == TIMEOUT_FLAG; + } + + private static long forceAddWork(BufferTimeoutWithBackpressureSubscriber instance, Function f) { + for (;;) { + long previousState = instance.state; + long nextState = f.apply(previousState) | HAS_WORK_IN_PROGRESS_FLAG; + if (STATE.compareAndSet(instance, previousState, nextState)) { + if (instance.stateLogger != null) { + instance.stateLogger.log(instance.toString(), + "faw", + previousState, + nextState); + } + return previousState; + } + } + } + + private static long forceUpdate(BufferTimeoutWithBackpressureSubscriber instance, Function f) { + for (;;) { + long previousState = instance.state; + long nextState = f.apply(previousState); + if (STATE.compareAndSet(instance, previousState, nextState)) { + if (instance.stateLogger != null) { + instance.stateLogger.log(instance.toString(), + "fup", + previousState, + nextState); + } + return nextState; + } + } + } + + private static int getOutstanding(long state) { + return (int) ((state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT); + } + + private static long addOutstanding(long state, long amount) { + long previousWithOutstandingClear = state &~ OUTSTANDING_MASK; + long outstandingMax = OUTSTANDING_MASK >> OUTSTANDING_SHIFT; + long current = (state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT; + long added = Math.min(current + amount, outstandingMax); + long newOutstanding = ((long) added) << OUTSTANDING_SHIFT; + return previousWithOutstandingClear | newOutstanding; + } + + private static > long tryClearWip(BufferTimeoutWithBackpressureSubscriber instance, long expectedState) { + for (;;) { + final long currentState = instance.state; + + if (expectedState != currentState) { + return currentState; + } + + long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG; + if (STATE.compareAndSet(instance, currentState, nextState)) { + if (instance.stateLogger != null) { + instance.stateLogger.log(instance.toString(), + "wcl", + currentState, + nextState); + } + return nextState; + } + } + } } final static class BufferTimeoutSubscriber> diff --git a/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java b/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java index e24a071e45..39e6c0deb6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java +++ b/reactor-core/src/main/java/reactor/core/publisher/StateLogger.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,7 +49,7 @@ void log(String instance, formatState(committedState, 64)), new RuntimeException()); } else { - this.logger.trace(String.format("[%s][%s][%s][%s-%s]", + this.logger.trace(String.format("[%s][%s][%s][\n\t%s\n\t%s]", instance, action, Thread.currentThread() diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java index 01b21276a5..20a2237a9a 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutFairBackpressureTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -62,6 +63,7 @@ public void tearDown() { } @Test + @Tag("slow") void backpressureSupported() throws InterruptedException { final int eventProducerDelayMillis = 200; // Event Producer emits requested items to downstream with a 200ms delay @@ -365,7 +367,7 @@ public void requestedFromUpstreamShouldNotExceedDownstreamDemand() { .assertNext(s -> assertThat(s).containsExactly("a")) .then(() -> assertThat(requestedOutstanding).hasValue(19)) .thenRequest(1) - .then(() -> assertThat(requestedOutstanding).hasValue(20)) + .then(() -> assertThat(requestedOutstanding).hasValue(19)) .thenCancel() .verify(); } @@ -392,6 +394,7 @@ public void scanSubscriberCancelled() { @Test public void bufferTimeoutShouldNotRaceWithNext() { Set seen = new HashSet<>(); + AtomicBoolean complete = new AtomicBoolean(); Consumer> consumer = integers -> { for (Integer i : integers) { if (!seen.add(i)) { @@ -399,7 +402,8 @@ public void bufferTimeoutShouldNotRaceWithNext() { } } }; - CoreSubscriber> actual = new LambdaSubscriber<>(consumer, null, null, null); + CoreSubscriber> actual = new LambdaSubscriber<>( + consumer, null, () -> complete.set(true), null); FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber> test = new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber>( @@ -419,6 +423,7 @@ public void bufferTimeoutShouldNotRaceWithNext() { test.onComplete(); assertThat(seen.size()).isEqualTo(500); + assertThat(complete.get()).isTrue(); } //see https://github.com/reactor/reactor-core/issues/1247 diff --git a/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java b/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java index 22d323bc9f..245c4ddef6 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java @@ -157,8 +157,7 @@ public class OnDiscardShouldNotLeakTest { DiscardScenario.fluxSource("monoFilterWhenFalse", main -> main.last().filterWhen(__ -> Mono.just(false).hide())), DiscardScenario.fluxSource("last", main -> main.last(new Tracked("default")).flatMap(f -> Mono.just(f).hide())), DiscardScenario.fluxSource("flatMapIterable", f -> f.flatMapIterable(Arrays::asList)), - // FIXME: uncomment once https://github.com/reactor/reactor-core/issues/3531 is resolved -// DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofMillis(1), true).flatMapIterable(Function.identity())), + DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofMillis(1), true).flatMapIterable(Function.identity())), DiscardScenario.fluxSource("publishOnDelayErrors", f -> f.publishOn(Schedulers.immediate())), DiscardScenario.fluxSource("publishOnImmediateErrors", f -> f.publishOn(Schedulers.immediate(), false, Queues.SMALL_BUFFER_SIZE)), DiscardScenario.fluxSource("publishOnAndPublishOn", main -> main From d74b24f3c0d2329a8a8707da6fcd54fae7ffcc79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 30 Apr 2024 14:54:23 +0200 Subject: [PATCH 05/10] Fixed issue with timer firing immediately before index got incremented --- .../core/publisher/FluxBufferTimeout.java | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index a0c6648832..6fa695d19e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -301,6 +301,14 @@ public void onNext(T t) { // flushes. However, timeout does not reset it to 0, it has its own // flag. boolean terminated = false; + if (terminated) { + previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + } else { + previousState = forceAddWork(this, state -> incrementIndex(state, 1)); + } + // We can only fire the timer once we increment the index first so that + // the timer doesn't fire first as it would consume the element and try + // to decrement the index below 0. if (getIndex(previousState) == 0) { // fire timer, new buffer starts try { @@ -312,11 +320,6 @@ public void onNext(T t) { terminated = true; } } - if (terminated) { - previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); - } else { - previousState = forceAddWork(this, state -> incrementIndex(state, 1)); - } } else { previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); } @@ -469,8 +472,6 @@ private void drain(boolean resumeDemand) { } if (terminated && queue.isEmpty()) { - // TODO: make sure we don't do this unless we - // know all the values were delivered first done = true; if (logger != null) { trace(logger, "terminated! error: " + this.error + " queue size: " + queue.size()); @@ -489,10 +490,6 @@ private void drain(boolean resumeDemand) { previousState = resetTimeout(incrementIndex(previousState, toDecrement)); } - // TODO: If we force an update before we lose the knowledge that - // some other actor modified something in between, so we must make - // sure to track these updates in both current and previous and - // compare. currentState = tryClearWip(this, previousState); // If the state changed (e.g. new item arrived, a request was issued, @@ -581,8 +578,7 @@ private static long incrementRequestIndex(long state) { } private static long getIndex(long state) { - long l = (state & INDEX_MASK) >> INDEX_SHIFT; - return l; + return (state & INDEX_MASK) >> INDEX_SHIFT; } private static long incrementIndex(long state, int amount) { @@ -605,10 +601,6 @@ private static long resetTimeout(long state) { return state & ~TIMEOUT_FLAG; } - private static long resetIndex(long state) { - return state & ~INDEX_MASK; - } - private static boolean isTimedOut(long state) { return (state & TIMEOUT_FLAG) == TIMEOUT_FLAG; } @@ -666,7 +658,8 @@ private static > long tryClearWip(BufferTimeo return currentState; } - long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG; + // remove both WIP and requested_index so that we avoid overflowing + long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG & ~REQUESTED_INDEX_MASK; if (STATE.compareAndSet(instance, currentState, nextState)) { if (instance.stateLogger != null) { instance.stateLogger.log(instance.toString(), From 156711cc721d21f92bc86e4bdfc1aa0b2e91b7a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 30 Apr 2024 17:33:32 +0200 Subject: [PATCH 06/10] Removed outstanding from state and fixed bug with termination upon timer scheduling rejection --- .../core/publisher/FluxBufferTimeout.java | 55 +++++-------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index 6fa695d19e..2df7470d66 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -147,6 +147,10 @@ final static class BufferTimeoutWithBackpressureSubscriber actual, @@ -188,11 +188,6 @@ public BufferTimeoutWithBackpressureSubscriber( Supplier bufferSupplier, @Nullable Logger logger) { this.actual = actual; - // TODO: reconsider OUTSTANDING to be taken out of the mask to allow for higher value - // -> this translates to 4MiB of ints - if (batchSize >= INDEX_LIMIT) { - throw new IllegalArgumentException("Batch size can't exceed " + INDEX_LIMIT + " items"); - } this.batchSize = batchSize; this.timeSpan = timeSpan; this.unit = unit; @@ -301,11 +296,7 @@ public void onNext(T t) { // flushes. However, timeout does not reset it to 0, it has its own // flag. boolean terminated = false; - if (terminated) { - previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); - } else { - previousState = forceAddWork(this, state -> incrementIndex(state, 1)); - } + previousState = forceAddWork(this, state -> incrementIndex(state, 1)); // We can only fire the timer once we increment the index first so that // the timer doesn't fire first as it would consume the element and try // to decrement the index below 0. @@ -317,7 +308,7 @@ public void onNext(T t) { currentTimeoutTask.update(disposable); } catch (RejectedExecutionException e) { this.error = Operators.onRejectedExecution(e, subscription, null, t, actual.currentContext()); - terminated = true; + previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); } } } else { @@ -456,18 +447,14 @@ private void drain(boolean resumeDemand) { boolean terminated = isTerminated(currentState); if (consumed > 0) { -// currentState = addOutstanding(this, -consumed); - long decrement = -consumed; - currentState = forceUpdate(this, state -> addOutstanding(state, decrement)); - previousState = addOutstanding(previousState, decrement); + outstanding -= consumed; } if (!terminated && currentRequest > 0) { // request more from upstream - int remaining = getOutstanding(currentState); + int remaining = this.outstanding; int replenishMark = prefetch >> 1; // TODO: create field limit instead if (remaining < replenishMark) { - currentState = requestMore(prefetch - remaining); - previousState = addOutstanding(previousState, prefetch - remaining); + requestMore(prefetch - remaining); } } @@ -527,13 +514,12 @@ int flush() { return i; } - private long requestMore(int n) { + private void requestMore(int n) { if (logger != null) { trace(logger, "requestMore " + n); } - long currentState = forceUpdate(this, state -> addOutstanding(state, n)); + outstanding += n; Objects.requireNonNull(this.subscription).request(n); - return currentState; } @Override @@ -637,19 +623,6 @@ private static long forceUpdate(BufferTimeoutWithBackpressureSubscriber in } } - private static int getOutstanding(long state) { - return (int) ((state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT); - } - - private static long addOutstanding(long state, long amount) { - long previousWithOutstandingClear = state &~ OUTSTANDING_MASK; - long outstandingMax = OUTSTANDING_MASK >> OUTSTANDING_SHIFT; - long current = (state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT; - long added = Math.min(current + amount, outstandingMax); - long newOutstanding = ((long) added) << OUTSTANDING_SHIFT; - return previousWithOutstandingClear | newOutstanding; - } - private static > long tryClearWip(BufferTimeoutWithBackpressureSubscriber instance, long expectedState) { for (;;) { final long currentState = instance.state; From 2ae60ecdf71efa68bbce1ab2f4a3a2654e7779e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 22 May 2024 10:41:31 +0200 Subject: [PATCH 07/10] Non-concurrent benchmark --- .../publisher/FluxBufferTimeoutBenchmark.java | 104 ++++++++++++++++++ .../core/publisher/FluxBufferTimeoutTest.java | 2 + 2 files changed, 106 insertions(+) create mode 100644 benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java diff --git a/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java new file mode 100644 index 0000000000..81cfa75225 --- /dev/null +++ b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java @@ -0,0 +1,104 @@ +package reactor.core.publisher; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; + +@BenchmarkMode({Mode.AverageTime}) +@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class FluxBufferTimeoutBenchmark { + + private static final int TOTAL_VALUES = 100; + + @Param({"1", "10", "100"}) + int bufferSize; + + @Benchmark + public Object unlimited(Blackhole blackhole) throws InterruptedException { + JmhSubscriber subscriber = new JmhSubscriber(blackhole, false); + Flux.range(0, TOTAL_VALUES) + .bufferTimeout(bufferSize, Duration.ofDays(100), true) + .subscribe(subscriber); + subscriber.await(); + return subscriber; + } + + @Benchmark + public Object oneByOne(Blackhole blackhole) throws InterruptedException { + JmhSubscriber subscriber = new JmhSubscriber(blackhole, true); + Flux.range(0, TOTAL_VALUES) + .bufferTimeout(bufferSize, Duration.ofDays(100), true) + .subscribe(subscriber); + subscriber.await(); + return subscriber; + } + + + public static class JmhSubscriber extends CountDownLatch + implements CoreSubscriber> { + + private final Blackhole blackhole; + private final boolean oneByOneRequest; + private Subscription s; + + public JmhSubscriber(Blackhole blackhole, boolean oneByOneRequest) { + super(1); + this.blackhole = blackhole; + this.oneByOneRequest = oneByOneRequest; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + if (oneByOneRequest) { + s.request(1); + } else { + s.request(Long.MAX_VALUE); + } + } + } + + @Override + public void onNext(List t) { + blackhole.consume(t); + if (oneByOneRequest) { + s.request(1); + } + } + + @Override + public void onError(Throwable t) { + blackhole.consume(t); + countDown(); + } + + @Override + public void onComplete() { + countDown(); + } + } + + public static void main(String[] args) { + + } +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index e750a5df96..2122941ad2 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +36,7 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; +import reactor.core.Disposable; import reactor.core.Exceptions; import reactor.core.Scannable; import reactor.core.scheduler.Scheduler; From f61b1fd35a2bf9974139532859231742b2a07618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 22 May 2024 10:42:24 +0200 Subject: [PATCH 08/10] clean up imports --- .../test/java/reactor/core/publisher/FluxBufferTimeoutTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index 2122941ad2..e750a5df96 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +35,6 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.core.Disposable; import reactor.core.Exceptions; import reactor.core.Scannable; import reactor.core.scheduler.Scheduler; From 50df5e870563228cea700d79577981247a4c2252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 22 May 2024 10:46:45 +0200 Subject: [PATCH 09/10] Remove main method from benchmark --- .../reactor/core/publisher/FluxBufferTimeoutBenchmark.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java index 81cfa75225..4c29791724 100644 --- a/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java +++ b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java @@ -97,8 +97,4 @@ public void onComplete() { countDown(); } } - - public static void main(String[] args) { - - } } From a548a96a788bff21394001d37e699670221bd3d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 22 May 2024 16:35:06 +0200 Subject: [PATCH 10/10] Simplify request method, add comments, clean up --- .../publisher/FluxBufferTimeoutBenchmark.java | 16 ++ .../core/publisher/FluxBufferTimeout.java | 225 +++++++++++------- 2 files changed, 156 insertions(+), 85 deletions(-) diff --git a/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java index 4c29791724..9da6292ce9 100644 --- a/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java +++ b/benchmarks/src/main/java/reactor/core/publisher/FluxBufferTimeoutBenchmark.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package reactor.core.publisher; import java.time.Duration; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index 2df7470d66..1fa01762a4 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -131,36 +131,52 @@ public Object scanUnsafe(Attr key) { final static class BufferTimeoutWithBackpressureSubscriber> implements InnerOperator { - final @Nullable Logger logger; - final @Nullable StateLogger stateLogger; - final CoreSubscriber actual; - final int batchSize; - final int prefetch; - final long timeSpan; - final TimeUnit unit; - final Scheduler.Worker timer; - final Supplier bufferSupplier; - private final Disposable.Swap currentTimeoutTask = Disposables.swap(); + private final @Nullable Logger logger; + private final @Nullable StateLogger stateLogger; + private final CoreSubscriber actual; + private final int batchSize; + private final int prefetch; + private final int replenishMark; + private final long timeSpan; + private final TimeUnit unit; + private final Scheduler.Worker timer; + private final Supplier bufferSupplier; + private final Disposable.Swap currentTimeoutTask = Disposables.swap(); + private final Queue queue; private @Nullable Subscription subscription; - private Queue queue; private @Nullable Throwable error; + /** + * Flag used to mark that the operator is definitely done processing all state + * transitions. + */ private boolean done; - // Access to outstanding is always guarded by volatile access to state so it - // needn't be volatile. It is also only ever accessed in the drain method so it - // needn't be part of state either. + /** + * Access to outstanding is always guarded by volatile access to state, so it + * needn't be volatile. It is also only ever accessed in the drain method, so + * it needn't be part of state either. + */ private int outstanding; - // tracks unsatisfied downstream demand (expressed in # of buffers) + /** + * Tracks unsatisfied downstream demand (expressed in # of buffers). Package + * visibility for testing purposes. + */ volatile long requested; @SuppressWarnings("rawtypes") - private AtomicLongFieldUpdater REQUESTED = + private static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "requested"); - volatile long state; + /** + * The state field serves as the coordination point for multiple actors. The + * surrounding implementation works as a state machine that provides mutual + * exclusion with lock-free semantics. It uses bit masks to divide a 64-bit + * long value for multiple concerns while maintaining atomicity with CAS operations. + */ + private volatile long state; @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater STATE = + private static final AtomicLongFieldUpdater STATE = AtomicLongFieldUpdater.newUpdater(BufferTimeoutWithBackpressureSubscriber.class, "state"); static final long CANCELLED_FLAG = @@ -176,8 +192,8 @@ final static class BufferTimeoutWithBackpressureSubscriber actual, @@ -196,10 +212,11 @@ public BufferTimeoutWithBackpressureSubscriber( this.logger = logger; this.stateLogger = logger != null ? new StateLogger(logger) : null; this.prefetch = batchSize << 2; + this.replenishMark = batchSize << 1; this.queue = Queues.get(prefetch).get(); } - private void trace(Logger logger, String msg) { + private static void trace(Logger logger, String msg) { logger.trace(String.format("[%s][%s]", Thread.currentThread().getId(), msg)); } @@ -228,45 +245,13 @@ public void request(long n) { } long previousState; - for (;;) { - previousState = this.state; - - if (queue.isEmpty() && (isTerminated(previousState) || isCancelled(previousState))) { - return; - } - - if (hasWorkInProgress(previousState)) { - // We let the active worker know about a change in demand. - long nextState = incrementRequestIndex(previousState); - if (STATE.compareAndSet(this, previousState, nextState)) { - if (this.stateLogger != null) { - this.stateLogger.log(this.toString(), "req", previousState, nextState); - } - // If the state was replaced but there was work in progress, - // before leaving the protected section the working actor - // will notice an update and loop again to pick up the - // demand increase. If the active actor was done before our - // update, this update would have failed and we'd loop - // again to see the current state. - return; - } - // CAS failed, retry - continue; - } - - long nextState = previousState | HAS_WORK_IN_PROGRESS_FLAG; - - if (STATE.compareAndSet(this, previousState, nextState)) { - if (this.stateLogger != null) { - this.stateLogger.log(this.toString(), "req", previousState, nextState); - } - break; - } + previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::incrementRequestIndex); + if (!hasWorkInProgress(previousState)) { + // If there was no demand before - try to fulfill the demand if there + // are buffered values. + drain(previouslyRequested == 0); } - - // if there was no demand before - try to fulfill the demand if there - // are buffered values - drain(previouslyRequested == 0); } } @@ -289,14 +274,15 @@ public void onNext(T t) { this.actual.currentContext()); Operators.onDiscard(t, this.actual.currentContext()); } - long previousState = this.state; + + long previousState; if (enqueued) { // Only onNext increments the index. Drain can set it to 0 when it // flushes. However, timeout does not reset it to 0, it has its own // flag. - boolean terminated = false; previousState = forceAddWork(this, state -> incrementIndex(state, 1)); + // We can only fire the timer once we increment the index first so that // the timer doesn't fire first as it would consume the element and try // to decrement the index below 0. @@ -308,11 +294,13 @@ public void onNext(T t) { currentTimeoutTask.update(disposable); } catch (RejectedExecutionException e) { this.error = Operators.onRejectedExecution(e, subscription, null, t, actual.currentContext()); - previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); } } } else { - previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); } if (!hasWorkInProgress(previousState)) { @@ -328,7 +316,8 @@ void bufferTimedOut() { return; } - long previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTimedOut); + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTimedOut); if (!hasWorkInProgress(previousState)) { drain(false); @@ -363,7 +352,8 @@ public void onComplete() { return; } - long previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setTerminated); if (!hasWorkInProgress(previousState)) { drain(false); @@ -383,20 +373,27 @@ public void cancel() { subscription.cancel(); } - long previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setCancelled); + long previousState = forceAddWork(this, + BufferTimeoutWithBackpressureSubscriber::setCancelled); if (!hasWorkInProgress(previousState)) { drain(false); } } - // Draining doesn't ensure exclusive access - the caller is responsible for - // ensuring that. + /** + * Drain the queue and perform any actions that result from the current state. + * Ths method must only be called when the caller ensured exclusive access. + * That means that it successfully indicated there's work by setting the WIP flag. + * + * @param resumeDemand {@code true} if the previous {@link #requested demand} + * value was 0. + */ private void drain(boolean resumeDemand) { + if (logger != null) { + trace(logger, "drain start"); + } for (;;) { - if (logger != null) { - trace(logger, "drain start"); - } long previousState = this.state; long currentState = previousState; @@ -417,7 +414,11 @@ private void drain(boolean resumeDemand) { int consumed = 0; if (logger != null) { - trace(logger, "should flush: " + shouldFlush + " currentRequest: " + currentRequest + " index: " + index + " isTerminated: " + isTerminated(currentState) + " isTimedOut: " + isTimedOut(currentState)); + trace(logger, "should flush: " + shouldFlush + + " currentRequest: " + currentRequest + + " index: " + index + + " isTerminated: " + isTerminated(currentState) + + " isTimedOut: " + isTimedOut(currentState)); } if (shouldFlush) { currentTimeoutTask.update(null); @@ -426,9 +427,9 @@ private void drain(boolean resumeDemand) { if (logger != null) { trace(logger, "flushed: " + consumedNow); } - // Need to make sure that if work is added we clear the + // We need to make sure that if work is added we clear the // resumeDemand with which we entered the drain loop as the - // state is now different + // state is now different. resumeDemand = false; if (consumedNow == 0) { break; @@ -441,7 +442,6 @@ private void drain(boolean resumeDemand) { break; } } - } boolean terminated = isTerminated(currentState); @@ -450,9 +450,8 @@ private void drain(boolean resumeDemand) { outstanding -= consumed; } if (!terminated && currentRequest > 0) { - // request more from upstream + // Request more from upstream. int remaining = this.outstanding; - int replenishMark = prefetch >> 1; // TODO: create field limit instead if (remaining < replenishMark) { requestMore(prefetch - remaining); } @@ -500,7 +499,7 @@ int flush() { element = queue.poll(); if (element == null) { - // there is demand, but queue is empty + // There is demand, but the queue is empty. return 0; } buffer = bufferSupplier.get(); @@ -536,7 +535,19 @@ public Object scanUnsafe(Attr key) { return InnerOperator.super.scanUnsafe(key); } - private static long bitwiseInc(long state, long mask, long shift, int amount) { + /* + Below are bit field operations that aid in transitioning the state machine. + An actual update to the state field is achieved with force* method prefix. + The try* prefix indicates that if an update is unsuccessful it will return with + the current state value instead of the intended one. + In these stateful operations we use the StateLogger to indicate transitions + happening. Below are the 3-letter acronyms used: + - faw = forceAddWork + - fup = forceUpdate + - wcl = WIP cleared (meaning work-in-progress and request fields were cleared) + */ + + private static long bitwiseIncrement(long state, long mask, long shift, int amount) { long shiftAndAdd = ((state & mask) >> shift) + amount; long shiftBackAndLimit = (shiftAndAdd << shift) & mask; long clearedState = state & ~mask; @@ -560,7 +571,7 @@ private static long setCancelled(long state) { } private static long incrementRequestIndex(long state) { - return bitwiseInc(state, REQUESTED_INDEX_MASK, REQUESTED_INDEX_SHIFT, 1); + return bitwiseIncrement(state, REQUESTED_INDEX_MASK, REQUESTED_INDEX_SHIFT, 1); } private static long getIndex(long state) { @@ -568,7 +579,7 @@ private static long getIndex(long state) { } private static long incrementIndex(long state, int amount) { - return bitwiseInc(state, INDEX_MASK, INDEX_SHIFT, amount); + return bitwiseIncrement(state, INDEX_MASK, INDEX_SHIFT, amount); } private static boolean hasWorkInProgress(long state) { @@ -591,7 +602,21 @@ private static boolean isTimedOut(long state) { return (state & TIMEOUT_FLAG) == TIMEOUT_FLAG; } - private static long forceAddWork(BufferTimeoutWithBackpressureSubscriber instance, Function f) { + /** + * Force a state update and return the state that the update is based upon. + * If the state was replaced but there was work in progress, before leaving the + * protected section the working actor will notice an update and loop again to + * pick up the update (e.g. demand increase). If there was no active actor or + * the active actor was done before our update, the caller of this method is + * obliged to check whether the returned value (the previousState) had WIP flag + * set. In case it was not, it should call the drain procedure. + * + * @param instance target of CAS operations + * @param f transformation to apply to state + * @return state value on which the effective update is based (previousState) + */ + private static long forceAddWork( + BufferTimeoutWithBackpressureSubscriber instance, Function f) { for (;;) { long previousState = instance.state; long nextState = f.apply(previousState) | HAS_WORK_IN_PROGRESS_FLAG; @@ -607,7 +632,19 @@ private static long forceAddWork(BufferTimeoutWithBackpressureSubscriber i } } - private static long forceUpdate(BufferTimeoutWithBackpressureSubscriber instance, Function f) { + /** + * Unconditionally force the state transition and return the new state instead + * of the old one. The caller has no way to know whether the update happened + * while something else had WIP flag set. Therefore, this method can only be + * used in the drain procedure where the caller knows that the WIP flag is set + * and doesn't need to make that inference. + * + * @param instance target of CAS operations + * @param f transformation to apply to state + * @return effective state value (nextState) + */ + private static long forceUpdate( + BufferTimeoutWithBackpressureSubscriber instance, Function f) { for (;;) { long previousState = instance.state; long nextState = f.apply(previousState); @@ -623,7 +660,25 @@ private static long forceUpdate(BufferTimeoutWithBackpressureSubscriber in } } - private static > long tryClearWip(BufferTimeoutWithBackpressureSubscriber instance, long expectedState) { + /** + * Attempt to clear the work-in-progress (WIP) flag. If the current state + * doesn't match the expected state, the current state is returned and the flag + * is not cleared. Otherwise, the effective new state is returned that has: + *
    + *
  • WIP cleared
  • + *
  • Requested index cleared
  • + *
+ * + * @param instance target of CAS operations + * @param expectedState the state reading on which the caller bases the + * intention to remove the WIP flag. In case it's + * currently different, the caller should repeat the + * drain procedure to notice any updates. + * @return current state value (currentState in case of expectations + * mismatch or nextState in case of successful WIP clearing) + */ + private static > long tryClearWip( + BufferTimeoutWithBackpressureSubscriber instance, long expectedState) { for (;;) { final long currentState = instance.state; @@ -631,7 +686,7 @@ private static > long tryClearWip(BufferTimeo return currentState; } - // remove both WIP and requested_index so that we avoid overflowing + // Remove both WIP and requested_index so that we avoid overflowing long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG & ~REQUESTED_INDEX_MASK; if (STATE.compareAndSet(instance, currentState, nextState)) { if (instance.stateLogger != null) {