New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskLocal doesn't preserve locals for Task.executeOn #612

Closed
Avasil opened this Issue Mar 19, 2018 · 10 comments

Comments

Projects
None yet
3 participants
@Avasil
Collaborator

Avasil commented Mar 19, 2018

See code samples below.

asyncBoundary works fine:

implicit val ec: Scheduler = monix.execution.Scheduler.Implicits.global
val ec2: Scheduler = monix.execution.Scheduler.io()
implicit val opts = Task.defaultOptions.enableLocalContextPropagation

for {
  local <- TaskLocal(0).asyncBoundary(ec2)
  _ <- local.write(100).asyncBoundary(ec)
  v1 <- local.read.asyncBoundary(ec2) // 100
  v2 <- local.read // 100
  _ <- Task { println(s"$v1 : $v2")}
} yield ()

executeOn:

for {
  local <- TaskLocal(0)
  _ <- local.write(100).executeOn(ec2)
  v1 <- local.read.executeOn(ec)  // 0
  v2 <- local.read.executeOn(ec2) // 0
  _ <- Task { println(s"$v1 : $v2")}
} yield ()

Link to the discussion on Gitter

@Avasil Avasil added the bug label Mar 19, 2018

@Avasil Avasil added this to the 3.0.0 milestone Mar 19, 2018

@alexandru

This comment has been minimized.

Member

alexandru commented Mar 19, 2018

/cc @leandrob13 you available for investigating it? 🙂

@leandrob13

This comment has been minimized.

Contributor

leandrob13 commented Mar 19, 2018

@alexandru sure thing!

@leandrob13

This comment has been minimized.

Contributor

leandrob13 commented Mar 19, 2018

@alexandru @Avasil It seems that the writes to TaskLocal must be made from Task.now. This solved the problem:

def write(value: A): Task[Unit] =
    Task.now(ref.update(value))

The TaskExecuteOn creates an Async instance from start and the Task.unsafeStartAsync(source, ctx2, cb2) executes an Eval instance (which is given by the TaskLocal#write). The local doesn't get set when you call the write.

Gonna make the PR.

@alexandru

This comment has been minimized.

Member

alexandru commented Mar 19, 2018

We can't make them eager. If we do, then TaskLocal is broken.

@leandrob13

This comment has been minimized.

Contributor

leandrob13 commented Mar 19, 2018

Understood. Going to leave the PR I already opened and find the correct solution.

@alexandru

This comment has been minimized.

Member

alexandru commented Apr 7, 2018

Sorry for the long delay in response, I had to do play with it myself.

I don't like this solution in PR #613 because the problem is how the TaskRunLoop is implemented and this solution is overriding the Local.bind instructions in TaskRunLoop. In other words we end up with duplicate work and that's not cool.

I also don't think that we need saving and restoring of that context on each trampolined async boundary, I think we need it only for the actually async ones, which the current implementation is failing to do.

The real problem is that when an Async happens, whatever goes on inside it will get overridden in the bind continuation. So local.write(100) happens within the execution of an Async and then the old context gets restored immediately afterwards.

Playing around, I removed all Local.bind statements from TaskRunLoop or other traces of Local, replacing it in that sample with usage of TracingScheduler and it appears to work well:

import monix.eval._
import monix.execution.Scheduler
import monix.execution.schedulers.TracingScheduler

object Playground extends TaskApp {
  implicit val ec: Scheduler = TracingScheduler(Scheduler.computation(4, "ec1"))
  implicit val ec2: Scheduler = TracingScheduler(Scheduler.computation(4, "ec2"))

  override protected val scheduler: Coeval[Scheduler] =
    Coeval.evalOnce(TracingScheduler(Scheduler.global))

  override def runc: Task[Unit] = {
    for {
      local <- TaskLocal(0)
      _ <- local.write(100).executeOn(ec2)
      v1 <- local.read.executeOn(ec) 
      v2 <- local.read.executeOn(ec2)
      _ <- Task { println(s"$v1, $v2") }
    } yield ()
  }
}

So going forward, one possible solution when detecting localContextPropagation == true is to just wrap the Scheduler in a TracingScheduler.

Another one would be something like what @leandrob13 has been doing in #613, but without the duplicated work.

Going away for the Easter holiday, will be back on Tuesday to get to the bottom of this.

@leandrob13

This comment has been minimized.

Contributor

leandrob13 commented Apr 8, 2018

@alexandru just saw your comment, I will be testing this asap. Got caught up with #624.

@leandrob13

This comment has been minimized.

Contributor

leandrob13 commented Apr 8, 2018

@alexandru I remembered it was my first approach to the problem, let the TracingScheduler handle the async boundaries, let me get back to that.

@alexandru

This comment has been minimized.

Member

alexandru commented Apr 8, 2018

@leandrob13

This comment has been minimized.

Contributor

leandrob13 commented Apr 8, 2018

I think that scenario is covered in the tests (provided by one of the issues @Avasil reported):

testAsync("TaskLocal.apply with different schedulers") {
    val test =
      for {
        local <- TaskLocal(0).asyncBoundary(ec2)
        _ <- local.write(800).asyncBoundary(ec)
        v1 <- local.read.asyncBoundary(ec2)
        v2 <- local.read
        _ <- Task.now(assertEquals(v1, v2))
      } yield ()

    test.runAsyncOpt
  }

Neither ec or ec2 are tracing schedulers. If there is a different use case of asyncBoundary that this would not comply, you know I am more than happy to check it out. This is done by something I wanted to validate with you about the context creation in #613 (which takes care of this kind of scenarios given that the local propagation is enabled in options):

  def apply(scheduler: Scheduler, options: Options, connection: StackedCancelable): Context = {
      val em = scheduler.executionModel
      val frameRef = FrameIndexRef(em)
      val sch = if (options.localContextPropagation) scheduler.trace else scheduler
      Context(sch, options, connection, frameRef)
    }

The scheduler.trace is just an extension method that does the wrapping of the current scheduler with a TracingContext. I added this also for Context instances:

  def withScheduler(sch: Scheduler): Context =
      copy(scheduler = if (options.localContextPropagation) sch.trace else sch)

This should take care of your request of using a TracingScheduler upon enabling the local propagation.

@alexandru alexandru referenced this issue May 10, 2018

Merged

MAJOR: Change internal encoding for Task.Async #647

21 of 21 tasks complete

@alexandru alexandru modified the milestones: 3.0.0, 3.0.0-RC2 Sep 25, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment