diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java index 8698c38d0..496b72f96 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java @@ -60,6 +60,10 @@ public boolean hasReachedCompletion() { return current.value instanceof Completion; } + public boolean willReachCompletion() { + return !hasReachedCompletion() && current.next.value instanceof Completion; + } + public boolean hasReachedFailure() { return current.value instanceof Failure; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java index 20f3dc714..35cd8ccef 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java @@ -21,7 +21,7 @@ public class ReplayOperator extends AbstractMulti { private final AtomicBoolean upstreamSubscriptionRequested = new AtomicBoolean(); private volatile Subscription upstreamSubscription = null; - private final CopyOnWriteArrayList subscriptions = new CopyOnWriteArrayList<>(); + protected final CopyOnWriteArrayList subscriptions = new CopyOnWriteArrayList<>(); public ReplayOperator(Multi upstream, long numberOfItemsToReplay) { this.upstream = upstream; @@ -39,11 +39,11 @@ public void subscribe(MultiSubscriber subscriber) { upstream.subscribe(new UpstreamSubscriber(subscriber)); } ReplaySubscription replaySubscription = new ReplaySubscription(subscriber); - subscriber.onSubscribe(replaySubscription); subscriptions.add(replaySubscription); + subscriber.onSubscribe(replaySubscription); } - private class ReplaySubscription implements Subscription { + protected class ReplaySubscription implements Subscription { private final MultiSubscriber downstream; private final AtomicLong demand = new AtomicLong(); @@ -115,6 +115,12 @@ private void drain() { downstream.onItem(item); emitted++; } + if (!done && cursor.willReachCompletion()) { + cancel(); + cursor.readCompletion(); + downstream.onComplete(); + return; + } demand.addAndGet(-emitted); if (wip.decrementAndGet() == 0) { return; @@ -123,7 +129,7 @@ private void drain() { } } - private class UpstreamSubscriber implements MultiSubscriber, ContextSupport { + protected class UpstreamSubscriber implements MultiSubscriber, ContextSupport { private final MultiSubscriber initialSubscriber; diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java index 362c74016..9fdcd8106 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java @@ -9,8 +9,13 @@ import java.util.Arrays; import java.util.List; import java.util.Random; -import java.util.concurrent.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -66,6 +71,17 @@ void basicReplayAll() { sub.assertCompleted(); } + @Test + void shouldCompleteWhenRequestEqualsMax() { + Multi upstream = Multi.createFrom().range(1, 10); + Multi replay = Multi.createBy().replaying().upTo(9).ofMulti(upstream); + + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(9); + assertThat(sub.getItems()).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9); + sub.assertCompleted(); + } + @Test void basicReplayLatest3() { ExecutorService pool = Executors.newFixedThreadPool(1); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java new file mode 100644 index 000000000..1ebe24f49 --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java @@ -0,0 +1,27 @@ +package io.smallrye.mutiny.operators.multi.replay; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; + +public class ReplayOperatorTest { + + @Test + public void shouldRemoveSubscriptionAfterCompletion() { + // given + var upstream = Multi.createFrom().range(0, 3); + var operator = new ReplayOperator<>(upstream, 3); + + // when + var subscriber = operator.subscribe().withSubscriber(AssertSubscriber.create(3)); + var subscriber2 = operator.subscribe().withSubscriber(AssertSubscriber.create(3)); + + // then + subscriber.assertItems(0, 1, 2).assertCompleted(); + subscriber2.assertItems(0, 1, 2).assertCompleted(); + assertEquals(0, operator.subscriptions.size()); + } +}