Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why it's blocked when there are 449 elements in the flux and cache? #3413

Closed
lvhuichao opened this issue Mar 23, 2023 · 1 comment
Closed
Labels
for/stackoverflow Questions are best asked on SO or Gitter

Comments

@lvhuichao
Copy link

lvhuichao commented Mar 23, 2023

I found a strange issue when i use flux cache. Here is the code:

    @Test
    public void test() {
        Stream<Integer> integerStream = IntStream.range(0, 449).boxed();
        Flux<Integer> integerFlux = Flux.fromStream(integerStream)
                .log()
                .cache();

        Mono<Map<String, String>> mapMono = integerFlux
                .collectList()
                .map((Function<List<Integer>, Map<String, String>>) integers -> new HashMap<>())
                .cache();

        integerFlux
                .flatMap(integer -> Mono.zip(Mono.just(integer), mapMono))
                .blockLast();
    }

It will block, but if i change the 449 to 448, it will successfully complete. And here are the result images for this 2 situations.
449:
image

448
image

My reactor-core version is 3.4.19.

Please help me, thanks.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Mar 23, 2023
@OlegDokuka OlegDokuka added for/stackoverflow Questions are best asked on SO or Gitter and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Mar 23, 2023
@OlegDokuka
Copy link
Contributor

OlegDokuka commented Mar 23, 2023

@lvhuichao

What you observe is the backpressure effect. The cache operator has a default size of prefetch equal to 256. The following prefetch is 256 - 256 >> 2 which is 192. So there are 2 prefetches happening 256 and then 192 which is in total 448.

You may wonder why you don't see more. The answer is because of your usage of the shared flux. It blocks itself within flatMap. The FlatMap operator also has built-in backpressure so flatMap backpressure stops and hereby cache backpressure also stops and both operators stop requesting upstream.

Why flatMap may stop requesting more? The FlatMap operator has a concurrency of 256 elements, which means 256 concurrent streams can run at the same time and be merged within that operator. Whenever the inner publisher completes, the flat map may request one more element from the upstream. If the inner source does not complete, flatMap does not request more. You may easily check that behavior in the following example:

Flux.range(0, 257)
       .log()
       .flatMap(i -> Flux.never())
       .subscribe()

You will see that onNext(256) never happens. That is fine, that is the so-called backpressure control implemented in flatMap. To fix that you can always adjust that value:

Flux.range(0, 257)
       .log()
       .flatMap(i -> Flux.never(), 257) 
       .subscribe()

Now you should see the last element and onComplete after that.

So, what is wrong in your case? The problem with your scenario is that you are flatting the same flux. This means if that flux can not complete earlier, all the flattened variants will run forever, which means you will have a deadlock.

You may try to fix it as follows:

   @Test
    public void test() {
        Stream<Integer> integerStream = IntStream.range(0, 449).boxed();
        Flux<Integer> integerFlux = Flux.fromStream(integerStream)
                .log()
                .cache();

        Mono<Map<String, String>> mapMono = integerFlux
                .collectList()
                .map((Function<List<Integer>, Map<String, String>>) integers -> new HashMap<>())
                .cache();

        integerFlux
                .flatMap(integer -> Mono.zip(Mono.just(integer), mapMono), Integer.MAX_VALUE)
                .blockLast();
    }

@OlegDokuka OlegDokuka closed this as not planned Won't fix, can't repro, duplicate, stale Mar 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

3 participants