From 156711cc721d21f92bc86e4bdfc1aa0b2e91b7a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 30 Apr 2024 17:33:32 +0200 Subject: [PATCH] Removed outstanding from state and fixed bug with termination upon timer scheduling rejection --- .../core/publisher/FluxBufferTimeout.java | 55 +++++-------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index 6fa695d19e..2df7470d66 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -147,6 +147,10 @@ final static class BufferTimeoutWithBackpressureSubscriber actual, @@ -188,11 +188,6 @@ public BufferTimeoutWithBackpressureSubscriber( Supplier bufferSupplier, @Nullable Logger logger) { this.actual = actual; - // TODO: reconsider OUTSTANDING to be taken out of the mask to allow for higher value - // -> this translates to 4MiB of ints - if (batchSize >= INDEX_LIMIT) { - throw new IllegalArgumentException("Batch size can't exceed " + INDEX_LIMIT + " items"); - } this.batchSize = batchSize; this.timeSpan = timeSpan; this.unit = unit; @@ -301,11 +296,7 @@ public void onNext(T t) { // flushes. However, timeout does not reset it to 0, it has its own // flag. boolean terminated = false; - if (terminated) { - previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); - } else { - previousState = forceAddWork(this, state -> incrementIndex(state, 1)); - } + previousState = forceAddWork(this, state -> incrementIndex(state, 1)); // We can only fire the timer once we increment the index first so that // the timer doesn't fire first as it would consume the element and try // to decrement the index below 0. @@ -317,7 +308,7 @@ public void onNext(T t) { currentTimeoutTask.update(disposable); } catch (RejectedExecutionException e) { this.error = Operators.onRejectedExecution(e, subscription, null, t, actual.currentContext()); - terminated = true; + previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); } } } else { @@ -456,18 +447,14 @@ private void drain(boolean resumeDemand) { boolean terminated = isTerminated(currentState); if (consumed > 0) { -// currentState = addOutstanding(this, -consumed); - long decrement = -consumed; - currentState = forceUpdate(this, state -> addOutstanding(state, decrement)); - previousState = addOutstanding(previousState, decrement); + outstanding -= consumed; } if (!terminated && currentRequest > 0) { // request more from upstream - int remaining = getOutstanding(currentState); + int remaining = this.outstanding; int replenishMark = prefetch >> 1; // TODO: create field limit instead if (remaining < replenishMark) { - currentState = requestMore(prefetch - remaining); - previousState = addOutstanding(previousState, prefetch - remaining); + requestMore(prefetch - remaining); } } @@ -527,13 +514,12 @@ int flush() { return i; } - private long requestMore(int n) { + private void requestMore(int n) { if (logger != null) { trace(logger, "requestMore " + n); } - long currentState = forceUpdate(this, state -> addOutstanding(state, n)); + outstanding += n; Objects.requireNonNull(this.subscription).request(n); - return currentState; } @Override @@ -637,19 +623,6 @@ private static long forceUpdate(BufferTimeoutWithBackpressureSubscriber in } } - private static int getOutstanding(long state) { - return (int) ((state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT); - } - - private static long addOutstanding(long state, long amount) { - long previousWithOutstandingClear = state &~ OUTSTANDING_MASK; - long outstandingMax = OUTSTANDING_MASK >> OUTSTANDING_SHIFT; - long current = (state & OUTSTANDING_MASK) >> OUTSTANDING_SHIFT; - long added = Math.min(current + amount, outstandingMax); - long newOutstanding = ((long) added) << OUTSTANDING_SHIFT; - return previousWithOutstandingClear | newOutstanding; - } - private static > long tryClearWip(BufferTimeoutWithBackpressureSubscriber instance, long expectedState) { for (;;) { final long currentState = instance.state;