Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Premature thread interruption when using Mono.just().transform(flux) #1107

Closed
dfeist opened this issue Mar 2, 2018 · 1 comment
Closed
Assignees
Labels
status/has-workaround This has a known workaround described type/enhancement A general enhancement
Milestone

Comments

@dfeist
Copy link
Contributor

dfeist commented Mar 2, 2018

See: ReactiveX/RxJava#5711
See: ReactiveX/RxJava#5715

In the following scenario:

Scheduler scheduler = Schedulers.fromExecutorService(Executors.newCachedThreadPool());
Flux<Integer> flux = Flux.just(1);
Function<Publisher<Integer>, Publisher<Integer>> addOne = f -> Flux.from(f)
		                                                    .map(i -> i + 1)
		                                                    .publishOn(scheduler);
flux.concatMap(j -> Mono.just(j).transform(addOne)).subscribe(System.out::println);
  • the Mono.transform, given addOne is not a Mono uses MonoNext to wrap
  • MonoNext does s.cancel() before actual.onNext(t);
  • WorkerTask future gets cancelled (due to MonoNext s.cancel), and can get cancelled even before reactor.core.scheduler.WorkerTask#setFuture. If this happens thread is interrupted and processing on that thread (e.g. reading of IO) can be cut short

In my case the thread doing the map() is an HTTP selector thread and the Thread.interrupt() provoked in WorkerTask#setFuture causes the reading of AbstractInterruptibleChannel to be cut short with a ClosedByInterruptException resulting in an empty response body.

The cancel-before-next in MonoNext is necessary but causes a race condition between reactor.core.scheduler.WorkerTask#dispose and sr.setFuture(f) (in reactor.core.scheduler.Schedulers#workerSchedule), where when dispose() is called before setFuture(f) the thread that is still processing the onNext() signal is prematurely interrupted.

simonbasle added a commit that referenced this issue Mar 2, 2018
This commit prevents the undesirable interruption of the WorkerTask's
thread when a cancellation happens within the run() method (eg. a
MonoNext is cancelling upon first onNext downstream).

This would previously lead to a race condition between the cancellation
and the setFuture, which would interrupt if called AFTER the dispose.
@simonbasle simonbasle self-assigned this Mar 2, 2018
@simonbasle simonbasle added the type/enhancement A general enhancement label Mar 2, 2018
@simonbasle simonbasle added this to the 3.2.0.RELEASE milestone Mar 2, 2018
@simonbasle simonbasle added the status/has-workaround This has a known workaround described label Mar 2, 2018
@simonbasle
Copy link
Member

has-workaround: The particular code snippet can be changed to using Flux.just(f) in the concatMap instead of Mono.just(j), which will avoid the MonoNext and associated cancellation altogether.

simonbasle added a commit that referenced this issue Mar 6, 2018
This commit prevents the undesirable interruption of the WorkerTask's
thread when a cancellation happens within the run() method (eg. a
MonoNext is cancelling upon first onNext downstream).

This would previously lead to a race condition between the cancellation
and the setFuture, which would interrupt if called AFTER the dispose.

This is a backport of #1108 (commit a0b752a), as tracked in #1111
@simonbasle simonbasle modified the milestones: 3.2.0.RELEASE, 3.2.0.M2 May 17, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/has-workaround This has a known workaround described type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

2 participants