-
-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #719: propagate cancellation to tasks in async Consumers #919
Conversation
(out, AssignableCancelable.dummy) | ||
(out, SingleAssignCancelable.plusOne(Cancelable { () => | ||
out.synchronized { | ||
isDone = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be isDone = true
?
@@ -31,13 +31,22 @@ import scala.concurrent.Future | |||
private[reactive] final class ForeachAsyncConsumer[A](f: A => Task[Unit]) extends Consumer[A, Unit] { | |||
|
|||
def createSubscriber(cb: Callback[Throwable, Unit], s: Scheduler): (Subscriber[A], AssignableCancelable) = { | |||
var isDone = false | |||
var lastCancelable = Cancelable.empty | |||
|
|||
val out = new Subscriber[A] { | |||
implicit val scheduler = s | |||
private[this] var isDone = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you've forgotten to remove it
@@ -43,7 +45,10 @@ private[reactive] final class MapTaskConsumer[In, R, R2](source: Consumer[In, R] | |||
try { | |||
val task = f(value) | |||
streamErrors = false | |||
task.runAsync(cb) | |||
self.synchronized { | |||
if (!isCancelled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we don't have to synchronize isCancelled
if it's volatile
(or it doesn't have to be volatile
if it's only used in synchronized block)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm synchronizing access across two variables here. Volatile probably is redundant
@@ -30,7 +30,9 @@ private[reactive] final class MapTaskConsumer[In, R, R2](source: Consumer[In, R] | |||
extends Consumer[In, R2] { | |||
|
|||
def createSubscriber(cb: Callback[Throwable, R2], s: Scheduler): (Subscriber[In], AssignableCancelable) = { | |||
val asyncCallback = new Callback[Throwable, R] { | |||
@volatile var lastCancelable: Cancelable = Cancelable.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be volatile
?
@@ -24,6 +24,8 @@ import monix.execution.exceptions.DummyException | |||
import monix.reactive.{BaseTestSuite, Consumer, Observable} | |||
import scala.util.Failure | |||
|
|||
import monix.execution.atomic.Atomic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?
Missing |
No description provided.