Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FluxGroupBy silently drops onNext signals #2352

Open
Sage-Pierce opened this issue Sep 2, 2020 · 13 comments
Open

FluxGroupBy silently drops onNext signals #2352

Sage-Pierce opened this issue Sep 2, 2020 · 13 comments
Milestone

Comments

@Sage-Pierce
Copy link

Sage-Pierce commented Sep 2, 2020

I have a use case where I'm using Flux::groupBy with a relatively high cardinality of grouping (1024-4096 concurrent groups at any given time). FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.

Each "group" is limited in time (.take(Duration)) and size (.take(Long)) and collected as a List. Under high load, it appears some items are being dropped.

Thus far, I think I can point to where the relevant pieces of code are:

Expected Behavior

Items are not dropped by groupBy operator

Actual Behavior

Items emitted from upstream of groupBy operator are not emitted downstream, and are silently being dropped.

Steps to Reproduce

I wrote the following test to show a simplified version of what's going on with my use case:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(Schedulers.elastic())
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L))
                .take(2)
                .collectList(), 16384)
            .doOnNext(batch -> {
                try {
                    Thread.sleep(1L);
                } catch (Exception e) {

                }
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

Interestingly, the test sometimes passes without the doOnNext with Thread.sleep(1L). I had to add that to consistently get it to fail (and mimic small real-world processing overhead of each downstream item)

Possible Solution

I'm not sure. Still trying to figure out exactly what the problem is in groupBy

Your Environment

  • Reactor version(s) used: 3.3.9.RELEASE
  • JVM version: 1.8.0_172
  • OS and version (eg uname -a): Mac OS Catalina 10.15.6 (Darwin Kernel Version 19.6.0 x86_64)
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Sep 2, 2020
@Sage-Pierce
Copy link
Author

Sage-Pierce commented Sep 4, 2020

I think I've isolated this particular unexpected behavior to being the result of a race condition in FluxGroupBy between draining and cancellation on UnicastGroupedFlux.

I think I've figured a workaround that both mitigates my issue and verifies the race condition. If I isolate both publish and subscribe operations on FluxGroupBy with the same Worker (Thread) that's responsible for the take(Duration) scheduling (on the GroupedFlux instances), the problem goes away:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.single(Schedulers.elastic());

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .subscribeOn(scheduler, true)
            .doOnNext(batch -> {
                try {
                    Thread.sleep(1L);
                } catch (Exception e) {

                }
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

I suppose my question to Maintainers is "what should be the general expected behavior in the absence of this Scheduler isolation?"

At the very least, silent dropping of onNext items seems like a bug.

I think it may be debatable whether or not item dropping should be possible in this situation. If so, such behavior may be a surprise to clients, especially without decent knowledge of how publish and subscribe operation threading may cause this condition

@bsideup
Copy link
Contributor

bsideup commented Oct 6, 2020

@Sage-Pierce you seem to be blocking the non-blocking threads. Maybe you wanted publishOn, not subscribeOn before doOnNext?

@bsideup bsideup added status/cannot-reproduce We cannot reproduce this issue. A repro case would help. and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Oct 6, 2020
@bsideup
Copy link
Contributor

bsideup commented Oct 6, 2020

when I remove Thread.sleep from doOnNext everything seems to be fine. Since doing Thread.sleep there is incorrect, @Sage-Pierce please provide a reproducer that is not relying on it.

@bsideup
Copy link
Contributor

bsideup commented Oct 6, 2020

FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.

For deduplicating an infinite stream, I'd recommend something like this (in case Flux#distinct does not work for you):

Flux.<Integer>generate(sink -> sink.next(ThreadLocalRandom.current().nextInt(0, 100)))
	.log("producer")
	.transformDeferred(flux -> {
		Set<Integer> items = new HashSet<>();
		return flux.filter(items::add);
	})
	.log("next")
	.take(100)
	.blockLast();

Note that the Set of items may produce OOM as well due to it being infinite.
Consider using some Cache instead (https://github.com/ben-manes/caffeine or Guava) that supports TTLs / eviction.

@Sage-Pierce
Copy link
Author

Thank you for the comments and suggestions, @bsideup!

you seem to be blocking the non-blocking threads. Maybe you wanted publishOn, not subscribeOn before doOnNext?

I'll update the failure scenario code (below) to use blocking threads

when I remove Thread.sleep from doOnNext everything seems to be fine

Although this is indeed blocking, the sleep call is present to mimic a small (1ms) overhead of processing downstream items. In actual usage, the process does some non-trivial computationally-bounded work that, IMO, isn't relevant to the issue that I think is being exposed. However, I believe the overhead itself is relevant due to increasing the likelihood of racing between publish and subscribe methods of FluxGroupBy (when not delegated to same Worker/Thread).

Here's an example without sleep, but with analogous processing overhead. Note that the test fails or succeeds based on whichever Scheduler configuration is used:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Fails
        // Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .subscribeOn(scheduler, true)
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());

For deduplicating an infinite stream, I'd recommend something like this ... using some Cache instead (https://github.com/ben-manes/caffeine or Guava) that supports TTLs / eviction

Using caches with TTLs and eviction is an interesting path to consider. However, there are a few problems with substituting it for the deduplication functionality I'm accomplishing with Reactor. For one, there would be less memory efficiency due to keeping items in the caches longer than they actually need to be. For another, the behavior is slightly different; In my case, when there are items/entities with duplicate IDs in the same window, I want to emit the latest item once the window expires. The provided Set-based substitute would emit the first/earliest item. There might be a way to implement emission of the latest items if Caffeine or Guava provides something like onEviction or onExpire callbacks, but I'm partial to the more-readable style that the Reactor code provides (and one-less library dependency 😄 ).

@bsideup
Copy link
Contributor

bsideup commented Oct 7, 2020

@Sage-Pierce I still see .subscribeOn(scheduler, true) in the code - is it intentional? The problem is that you're blocking the parallel scheduler and it may be that the delays you're using are not getting executed hence the lost items

@Sage-Pierce
Copy link
Author

I still see .subscribeOn(scheduler, true) in the code - is it intentional? The problem is that you're blocking the parallel scheduler

Yes, the subscribeOn is intentional such that groupBy subscription methods are executed with the same Worker that executes upstream publishing and GroupedFlux take cancelling. TBH, I mainly have it there since my real-world usage has further asynchronous boundaries (publishOn) downstream of subscribeOn. That being said, I can remove the subscribeOn altogether and still get same behavior, so it can be removed if it makes debugging easier.

I could certainly be wrong, but I don't think the latest code should be blocking on the parallel Scheduler anymore, due to explicitly specifying non-parallel Scheduler 🤔

@Sage-Pierce
Copy link
Author

Sage-Pierce commented Oct 7, 2020

FWIW, code without subscribeOn:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Fails
        //Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

@bsideup
Copy link
Contributor

bsideup commented Oct 8, 2020

@Sage-Pierce this code still blocks the thread (by running the heavy computations). Consider using publishOn before doOnNext

@Sage-Pierce
Copy link
Author

@bsideup I'll go ahead and add the publishOn, however I'm not certain I understand the value it provides as far as diagnosing the issue 🤔 I feel like it would at best seem to mask it by working around whatever/wherever the problem is. However, I could again be missing something.

In any case, with the publishOn, I get more interesting behavior. The test either hangs or succeeds with following code:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Hangs
//        Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .publishOn(scheduler)
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

And either hangs or flaky hang-fails with following code (note difference in downstream publishOn Scheduler):

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Hangs
//        Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Flaky hang-fails

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .publishOn(Schedulers.newParallel("test"))
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

Interestingly, the hanging behavior seems similar to issue discussed in #2138

@bsideup
Copy link
Contributor

bsideup commented Oct 13, 2020

Although I haven't fixed the issue yet, I think I got a much simpler reproducer:

final int total = 100;

Long count = Flux.range(0, total)
                 .groupBy(i -> (i / 2) * 2, 42)
                 .flatMap(it -> it.take(1), 2)
                 .publishOn(Schedulers.parallel(), 2)
                 .count()
                 .block(Duration.ofSeconds(60));
assertThat(total - count).as("count").isZero();

fails with:

Expected :0
Actual   :44

Sage-Pierce pushed a commit to Sage-Pierce/rhapsody that referenced this issue Nov 6, 2020
Sage-Pierce pushed a commit to Sage-Pierce/rhapsody that referenced this issue Nov 11, 2020
…opping items

[ExpediaGroup#94]Add test for high-load groupBy dropping (w/ early cancellation)
reactor/reactor-core#2352

[ExpediaGroup#94]Update DeduplicatingTransformerTest to actually expose bug

Fix comment
Sage-Pierce pushed a commit to ExpediaGroup/rhapsody that referenced this issue Nov 11, 2020
[#94]Add test for high-load groupBy dropping (w/ early cancellation)
reactor/reactor-core#2352

[#94]Update DeduplicatingTransformerTest to actually expose bug

Fix comment
Sage-Pierce pushed a commit to Sage-Pierce/rhapsody that referenced this issue Nov 13, 2020
…opping items

[ExpediaGroup#94]Add test for high-load groupBy dropping (w/ early cancellation)
reactor/reactor-core#2352

[ExpediaGroup#94]Update DeduplicatingTransformerTest to actually expose bug

Fix comment

(cherry picked from commit 0e8b963)
Sage-Pierce pushed a commit to ExpediaGroup/rhapsody that referenced this issue Nov 15, 2020
[#94]Add test for high-load groupBy dropping (w/ early cancellation)
reactor/reactor-core#2352

[#94]Update DeduplicatingTransformerTest to actually expose bug

Fix comment

(cherry picked from commit 0e8b963)
@bsideup bsideup removed their assignment Dec 1, 2020
@simonbasle simonbasle added status/need-investigation This needs more in-depth investigation and removed status/cannot-reproduce We cannot reproduce this issue. A repro case would help. labels Feb 18, 2021
@simonbasle simonbasle added this to the 3.3.x Backlog milestone Feb 18, 2021
@simonbasle
Copy link
Member

@Sage-Pierce since I see that you've reworked your code, and since we've made other changes to groupBy in the past year, can this issue be closed?

@simonbasle simonbasle added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed status/need-investigation This needs more in-depth investigation labels Nov 17, 2021
@Sage-Pierce
Copy link
Author

Sage-Pierce commented Nov 22, 2021

@simonbasle I'm not sure I can say this isn't still an issue. Even with Reactor 3.4.12 I can still observe silently dropped emissions with the following simplified example:

    @Test
    public void upstreamEmissionsShouldMatchDownstream() throws Exception {
        Hooks.onNextDroppedFail();
        int numGroups = 8;

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CompletableFuture<Void> future = new CompletableFuture<>();

        Flux.generate(sink -> sink.next(UUID.randomUUID()))
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L)), numGroups)
            .subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));

        future.get();
        System.out.println("Emitted: " + upstream.get());
        assertEquals(upstream.get(), downstream.get());
    }

I believe the problem is the time-based take on the "groups" from groupBy. What this appears to cause is a race condition between upstream item emissions and group cancellation (which happens on a different thread than the upstream emissions). If the .take(Duration.ofSeconds(1L)) is replaced with something non-timer-oriented, like .take(2), the test works fine.

To your point, though, I have found a workaround for this by ensuring that upstream publishing and group cancellation happens on the same thread by using a single-Worker Scheduler. The following modified example causes the test to pass:

    @Test
    public void upstreamEmissionsShouldMatchDownstream() throws Exception {
        Hooks.onNextDroppedFail();
        int numGroups = 8;
        Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CompletableFuture<Void> future = new CompletableFuture<>();

        Flux.generate(sink -> sink.next(UUID.randomUUID()))
            .take(Duration.ofSeconds(30L))
            .publishOn(scheduler)
            .doOnNext(next -> upstream.incrementAndGet())
            .groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler), numGroups)
            .subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));

        future.get();
        System.out.println("Emitted: " + upstream.get());
        assertEquals(upstream.get(), downstream.get());
   }

I still suspect users would be scratching their heads over the case where items are dropped. At the very least, I would expect the dropped onNext signals to result in an error due to the onNextDroppedFail hook. Ideally, however, I think it would be preferable that items are not dropped at all.

@simonbasle simonbasle removed the for/user-attention This issue needs user attention (feedback, rework, etc...) label Sep 26, 2022
@chemicL chemicL modified the milestones: 3.4.x Backlog, 3.6.x Backlog Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants