diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index a0c6648832..6fa695d19e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -301,6 +301,14 @@ public void onNext(T t) { // flushes. However, timeout does not reset it to 0, it has its own // flag. boolean terminated = false; + if (terminated) { + previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); + } else { + previousState = forceAddWork(this, state -> incrementIndex(state, 1)); + } + // We can only fire the timer once we increment the index first so that + // the timer doesn't fire first as it would consume the element and try + // to decrement the index below 0. if (getIndex(previousState) == 0) { // fire timer, new buffer starts try { @@ -312,11 +320,6 @@ public void onNext(T t) { terminated = true; } } - if (terminated) { - previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); - } else { - previousState = forceAddWork(this, state -> incrementIndex(state, 1)); - } } else { previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated); } @@ -469,8 +472,6 @@ private void drain(boolean resumeDemand) { } if (terminated && queue.isEmpty()) { - // TODO: make sure we don't do this unless we - // know all the values were delivered first done = true; if (logger != null) { trace(logger, "terminated! error: " + this.error + " queue size: " + queue.size()); @@ -489,10 +490,6 @@ private void drain(boolean resumeDemand) { previousState = resetTimeout(incrementIndex(previousState, toDecrement)); } - // TODO: If we force an update before we lose the knowledge that - // some other actor modified something in between, so we must make - // sure to track these updates in both current and previous and - // compare. currentState = tryClearWip(this, previousState); // If the state changed (e.g. new item arrived, a request was issued, @@ -581,8 +578,7 @@ private static long incrementRequestIndex(long state) { } private static long getIndex(long state) { - long l = (state & INDEX_MASK) >> INDEX_SHIFT; - return l; + return (state & INDEX_MASK) >> INDEX_SHIFT; } private static long incrementIndex(long state, int amount) { @@ -605,10 +601,6 @@ private static long resetTimeout(long state) { return state & ~TIMEOUT_FLAG; } - private static long resetIndex(long state) { - return state & ~INDEX_MASK; - } - private static boolean isTimedOut(long state) { return (state & TIMEOUT_FLAG) == TIMEOUT_FLAG; } @@ -666,7 +658,8 @@ private static > long tryClearWip(BufferTimeo return currentState; } - long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG; + // remove both WIP and requested_index so that we avoid overflowing + long nextState = currentState & ~HAS_WORK_IN_PROGRESS_FLAG & ~REQUESTED_INDEX_MASK; if (STATE.compareAndSet(instance, currentState, nextState)) { if (instance.stateLogger != null) { instance.stateLogger.log(instance.toString(),