Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1045: ensure early cancellation of Observable#mapParallel on error #1064

Merged
merged 9 commits into from Nov 6, 2019

Conversation

@TapanVaishnav
Copy link
Contributor

TapanVaishnav commented Oct 25, 2019

closes: #1045

@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Oct 26, 2019

Thanks @TapanVaishnav Could you add a test for it? Something that if n tasks are running in parallel and any of them fails then all the others will be canceled as well. It should be quite simple to do with TestScheduler which can be used to manually tick time. e.g.

val failedTask = Task.raiseError(ex).delayExecution(1.second)
val otherTask = Task.sleep(2.second).doOnCancel(incrementCounter)

Observable(0, 1, 2, 3, 4, 5, 6).mapParallelOrdered(n)(if (i == 0) failedTask else otherTask)

and then we can do s.tick(1.second) to instantly complete failedTask and see if others were canceled

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 26, 2019

Thanks @TapanVaishnav Could you add a test for it? Something that if n tasks are running in parallel and any of them fails then all the others will be canceled as well. It should be quite simple to do with TestScheduler which can be used to manually tick time. e.g.

val failedTask = Task.raiseError(ex).delayExecution(1.second)
val otherTask = Task.sleep(2.second).doOnCancel(incrementCounter)

Observable(0, 1, 2, 3, 4, 5, 6).mapParallelOrdered(n)(if (i == 0) failedTask else otherTask)

and then we can do s.tick(1.second) to instantly complete failedTask and see if others were canceled

Got it, will add a test. Thanks

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

@Avasil Hi, I've added test for MapParallelOrderedSuite and was planning to add one for mapParallelUnorderedSuite but because the map is unordered(in the latter case) we can't ensure the value of the counter. For example

    test("should cancel the whole stream when if one fails") { implicit s =>
    val wasThrown: Throwable = null
    var received = 0

    val failedTask = Task.raiseError(wasThrown).delayExecution(1.second)
    val otherTask = Task.sleep(2.second).doOnCancel(Task { received += 1})

    Observable(0,1,2,3,4,5,6).mapParallelUnordered(4)(i => if (i == 0) failedTask else otherTask)
      .toListL.runToFuture

    s.tick(1.second)

    assertEquals(received, 3)
  }

in the above case, because the map is unordered test might pass/fail, is that right?

@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Oct 30, 2019

I feel like it should still work because it will preserve the order of the input (so failedTask should start as the first one and then 1,2,3)

Did you experience non-deterministic behavior here?

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

Did you experience non-deterministic behavior here?

Observable(0,1,2,3,4,5,6).mapParallelUnordered(4)(i => Task.now(println(i))) .toListL.runToFuture
Yes, I changed my task inside the map to print the order and it's varying.
Like in one case the order was 1->3->4->2->0->5->6 and in another was 1->4->3->2->0->6->5

Thought not sure if is this the input order or the order in which Tasks are getting completed in their respective threads.

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

Also, I tried running the #1064 (comment) test and out of three cases, it passed in one and failed in others. So it seems like the input order isn't preserved. (which I believe, should be the case)

@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Oct 30, 2019

In your example the tasks are "instant" so I can see the output happening in this way, I think it would be a little bit more consistent with a small delay

AFAIK mapParallelUnordered will start input tasks in the correct order but they can finish in different order and will send any result downstream as soon as it is done

Also, I tried running the #1064 (comment) test and out of three cases, it passed in one and failed in others. So it seems like the input order isn't preserved.

I wonder if we have a race condition, i.e.

cancels other tasks, they release semaphore and the tasks which were waiting for the permit are starting

Do you see any tasks successfully completing or just more cancels?
I think we could try guaranteeCase instead of doOnCancel to track all exit cases and see what's going on

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

I think it would be a little bit more consistent with a small delay
I think we could try guaranteeCase instead of doOnCancel to track all exit cases and see what's going on

I see, will try both, thanks for your input. 👍

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

Hi @Avasil, I changed my observable to this Observable(0,1,2,3,4,5,6).mapParallelUnordered(4)(i => Task(println(i)).delayExecution(1.second)).toListL.runToFuture
and following are the outputs for two different runs
0->3->2->1->5->6->4
1->2->0->3->6->4->5

Am I executing my tasks in the right manner??

@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Oct 30, 2019

Hi @Avasil, I changed my observable to this Observable(0,1,2,3,4,5,6).mapParallelUnordered(4)(i => Task(println(i)).delayExecution(1.second)).toListL.runToFuture
and following are the outputs for two different runs
0->3->2->1->5->6->4
1->2->0->3->6->4->5

Am I executing my tasks right in the right manner??

This is correct, note that the first 4 tasks are always from 0-3 range.
0, 1, 2, 3 start and wait 1 second. They run concurrently so they can print in a different order

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

This is correct, note that the first 4 tasks are always from 0-3 range.
0, 1, 2, 3 start and wait 1 second. They run concurrently so they can print in a different order

I see but if that is the case then the below code should produce the same output(=3) as failedTask would always be there in the first call, right? But in some cases, it's producing the received value as 4 and in some as 3.


    test("should cancel the whole stream when if one fails") { implicit s =>
    val wasThrown: Throwable = null
    var received = 0

    val failedTask = Task.raiseError(wasThrown).delayExecution(1.second)
    val otherTask = Task.sleep(2.second).doOnCancel(Task { received+= 1; println("otherTask")})

    Observable(0,1,2,3,4,5,6).mapParallelUnordered(4)(i => if (i == 0) failedTask else otherTask).toListL.runToFuture

    s.tick(1.second)
    assertEquals(received, 6)
  }

Also, will check for the race condition(which you mentioned earlier) and will update the comment.

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 30, 2019

Ok, so the output is not the same on each run but at most only the 5th Task is getting called i.e.
Observable(0,1,2,3,4,5,6).mapParallelUnordered(4)(i => if (i == 0) failedTask else { Task.now(println(i)) ; otherTask}) .toListL.runToFuture
running this produces two kind of outputs

3
1
2
// the above order can be different as you mentioned earlier
otherTask
otherTask
otherTask
4
otherTask

and

3 
1
2
// the above order can be different as you mentioned earlier
otherTask
otherTask
otherTask

will check more on the code.

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Oct 31, 2019

@Avasil I think it was as you mentioned about the race condition,
failedTask was releasing the semaphore on cancellation and nextTask (element 4) was getting triggered (not sure if it making much sense), so I changed the doOnCancel function


to cancel the composite instead of releasing the semaphore and it's working as it should have.

@@ -105,7 +105,7 @@ private[reactive] final class MapParallelOrderedObservable[A, B](
composite -= head.cancelable
case Failure(ex) =>
lastAck = Stop
composite -= head.cancelable
composite.cancel() // cancel the whole downstream on error

This comment has been minimized.

Copy link
@Avasil

Avasil Nov 4, 2019

Collaborator

I think it will be better to composite.cancel() in process method so as soon as the task is completed with failure

This comment has been minimized.

Copy link
@Avasil

Avasil Nov 5, 2019

Collaborator

@TapanVaishnav Sorry for nitpicking but this line should be redundant now :D

This comment has been minimized.

Copy link
@TapanVaishnav

TapanVaishnav Nov 5, 2019

Author Contributor

Ahhh, because process calls sendDownstreamOrdered which then calls onNext and which then calls the process again. And if we stop the stream in the process itself then need not worry about sendDownstreamOrdered method.

This comment has been minimized.

Copy link
@TapanVaishnav

TapanVaishnav Nov 5, 2019

Author Contributor

@Avasil I have removed the above line and believe that we needn't to add composite -= head.cancelable again, right?

This comment has been minimized.

Copy link
@Avasil

Avasil Nov 5, 2019

Collaborator

Yes, I think there are some redundant operators there but it's OK, I have to sit one day and refactor entire operator, it's a bit of a mess :D

@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Nov 4, 2019

Thank you @TapanVaishnav
This fix should be fine, I've left one suggestion (it might cause ordered version to need a similar treatment but I'm not sure)

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Nov 5, 2019

@Avasil Thank you for your review. I've added composite.cancel() in both process and sendDownstreamOrdered method. Also, created a cancellable task instead of releasing semaphore.

private[this] val cancelComposite = Task.eval(composite.cancel())

@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Nov 5, 2019

All the tests are passing on my local machine and I didn't even touch the tests which are failing on CI(and don't have any dependency on MapParallel(Un)orderedSuite),
can you please have a look?

@@ -160,6 +157,7 @@ private[reactive] final class MapParallelOrderedObservable[A, B](

case Failure(error) =>
lastAck = Stop
composite.cancel()

This comment has been minimized.

Copy link
@Avasil

Avasil Nov 5, 2019

Collaborator

the order should be different, no gain in composite -= future.cancelable after it is canceled

This comment has been minimized.

Copy link
@TapanVaishnav

TapanVaishnav Nov 6, 2019

Author Contributor

the order should be different,

Do we even need this line now composite -= future.cancelable , as we can just cancel the composite using composite.cancel()?

This comment has been minimized.

Copy link
@Avasil

Avasil Nov 6, 2019

Collaborator

I'm not 100% so I'd prefer to stay with status quo

This comment has been minimized.

Copy link
@TapanVaishnav

TapanVaishnav Nov 6, 2019

Author Contributor

Hmm, Ok. Done

@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Nov 5, 2019

All the tests are passing on my local machine and I didn't even touch the tests which are failing on CI(and don't have any dependency on MapParallel(Un)orderedSuite),
can you please have a look?

Looks like a random error, I restarted CI

@Avasil
Avasil approved these changes Nov 6, 2019
Copy link
Collaborator

Avasil left a comment

Thank you for PR and sorry for nitpicking :D

@Avasil Avasil merged commit 4c18679 into monix:master Nov 6, 2019
1 check passed
1 check passed
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@TapanVaishnav

This comment has been minimized.

Copy link
Contributor Author

TapanVaishnav commented Nov 6, 2019

@Avasil hey, no worries.
I totally understand your concern, if you don’t nitpick now, the code will become cumbersome to understand later.
And I’m also in a learning phase so totally appreciate all the comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.