Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscribeOn(Schedulers.parallel()) doesn't execute in parallel when located right after a just() #342

Closed
antonio-marrero opened this issue Jan 6, 2017 · 3 comments
Assignees
Labels
type/bug A general bug
Milestone

Comments

@antonio-marrero
Copy link

I've noticed a different behaviour when you place a subscribeOn straigh after a just from when you put another operator in-between.
For example:

CountDownLatch latch = new CountDownLatch(1);

Flux<Integer> flux = Flux.range(0, 30);
flux.flatMap(count -> Flux.just(count)
     .subscribeOn(Schedulers.parallel())
     .map(i -> highCpuProcess(i)))
     .doAfterTerminate(latch::countDown)
     .subscribe( m -> System.out.println("Subscriber received - " + m + " on thread: " + Thread.currentThread().getName()));

latch.await();

This always creates four threads but only uses one and never runs the processes in parallel.
singlethread

On the contrary, when you put an operator in the middle the four threads are used, I use log() in my example but it can be any operator:

CountDownLatch latch = new CountDownLatch(1);

Flux<Integer> flux = Flux.range(0, 30);
flux.flatMap(count -> Flux.just(count)
     .log()
     .subscribeOn(Schedulers.parallel())
     .map(i -> highCpuProcess(i)))
     .doAfterTerminate(latch::countDown)
     .subscribe( m -> System.out.println("Subscriber received - " + m + " on thread: " + Thread.currentThread().getName()));

latch.await();

In this last case, the four threads are used:
multithread

@antonio-marrero
Copy link
Author

antonio-marrero commented Jan 6, 2017

NOTE: Using Mono instead of Flux works fine

CountDownLatch latch = new CountDownLatch(1);
		
Flux<Integer> flux = Flux.range(0, 30);
flux.flatMap(count -> Mono.just(count)
		.subscribeOn(Schedulers.parallel())
		.map(i -> highCpuProcess(i)))
.doAfterTerminate(latch::countDown)
.subscribe( m -> System.out.println("Subscriber received - " + m + " on thread: " +   Thread.currentThread().getName()));
latch.await();

@smaldini smaldini added the type/bug A general bug label Jan 6, 2017
@smaldini smaldini added this to the 3.0.5.RELEASE milestone Jan 6, 2017
@akarnokd
Copy link
Contributor

akarnokd commented Jan 7, 2017

This is a cross-boundary fusion problem with flatMap and an async-fuseable source that schedules emissions internally, like the fused just+subscribeOn and observeOn in general.

I don't know yet how would be possible to keep such fusion, therefore, I suggest disabling fusion on these classes:

  • FluxSubscribeOnValue
  • FluxPublishOn

You can simply return NONE or strip the classes from QueueSubscription

@smaldini
Copy link
Contributor

smaldini commented Jan 7, 2017

Yes for FluxSubscribeOnValue.
I wonder if we can keep the fusion for FluxPublishOn if its not backfused tho or parent is not fuseable ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants