diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java index 96762282a0..e4fdac299f 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java @@ -383,10 +383,12 @@ public Object scanUnsafe(Attr key) { } final void drain() { - if (WIP.getAndIncrement(this) != 0) { + if (WIP.get(this) != 0) { return; } + WIP.getAndIncrement(this); + int missed = 1; for (; ; ) { @@ -398,6 +400,7 @@ final void drain() { boolean empty = q == null || q.isEmpty(); if (checkTerminated(d, empty)) { + WIP.addAndGet(this, -missed); return; } @@ -432,6 +435,7 @@ final void drain() { v = null; } if (checkTerminated(d, v == null)) { + WIP.addAndGet(this, -missed); return; } if (sourceMode != Fuseable.SYNC) { @@ -459,6 +463,7 @@ final void drain() { empty = v == null; if (checkTerminated(d, empty)) { + WIP.addAndGet(this, -missed); return; } @@ -592,14 +597,16 @@ final void remove(FluxPublish.PubSubInner inner) { //contrary to FluxPublish, there is a possibility of auto-cancel, which //happens when the removed inner makes the subscribers array EMPTY if (autoCancel && b == EMPTY && Operators.terminate(S, this)) { - if (WIP.getAndIncrement(this) != 0) { + if (WIP.get(this) != 0) { return; } + WIP.getAndIncrement(this); terminate(); Queue q = queue; if (q != null) { q.clear(); } + WIP.decrementAndGet(this); } return; } diff --git a/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java index 449d47892b..2c91de9c43 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java @@ -921,4 +921,22 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() { .expectTimeout(Duration.ofSeconds(1)) .verify(); } + + @Test + public void cancelledSinkClearsQueue() { + SinkManyEmitterProcessor sinkManyEmitterProcessor = new SinkManyEmitterProcessor<>(true, 1); + // fill the buffer + assertThat(sinkManyEmitterProcessor.tryEmitNext(1)).as("filling buffer").isEqualTo(Sinks.EmitResult.OK); + StepVerifier.create(sinkManyEmitterProcessor) + .expectNext(1) + .expectTimeout(Duration.ofSeconds(1)) + .verify(); + sinkManyEmitterProcessor.asFlux().subscribe().dispose(); + + // fill the buffer + assertThat(sinkManyEmitterProcessor.tryEmitNext(1)).as("filling buffer").isEqualTo(Sinks.EmitResult.OK); + StepVerifier.create(sinkManyEmitterProcessor) + .verifyComplete(); + assertThat(sinkManyEmitterProcessor.queue.isEmpty()).as("Buffer should be empty").isTrue(); + } }