Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate Local isolation in runToFuture #1213

Merged
merged 20 commits into from
Oct 30, 2020

Conversation

Avasil
Copy link
Collaborator

@Avasil Avasil commented Jul 15, 2020

Fixes #848

@jvican @mdedetrich

This is what I have right now.

The change is to only restore parent Local on the current Thread and call the Future callback with isolated one which will be kept by each Future continuation.

I have released a SNAPSHOT: 3.3.0-69f970a-SNAPSHOT

If you could check if it works for your use case then it will be very helpful :)
I need to do more testing, check performance impact, consider more use cases (e.g. mapping completed future) etc.
I expect bugs but I feel like it might be already usable for the main use case.

I did a simple test: ( modify local in Task, read in Akka HTTP Directive)

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server._
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.misc.Local

import scala.util.Try

object AkkaHttpTest extends HttpApp with App {
  implicit val s: Scheduler = Scheduler.traced
  implicit val opts: Task.Options = Task.defaultOptions
  val local = Local(0)

  val addResponseHeader: Directive0 =
    mapInnerRoute(_.andThen(_.map { result =>
      println(s"Add Header: ${local.get}")
      result
    }))

  def onCompleteLocal[T](task:  Task[T]): Directive1[Try[T]] = {
    import akka.http.scaladsl.util.FastFuture._
    Directive { inner =>
      ctx =>
        task.runToFuture.fast.transformWith(t => inner(Tuple1(t))(ctx))
    }
  }

  override def routes: Route =
    path("hello" / IntNumber) { id =>
      get {
        addResponseHeader {
          onCompleteLocal(Task {
            local := id
          }) { _ => complete(StatusCodes.OK) }
        }
      }
    }

  AkkaHttpTest.startServer("localhost", 8080)
}

And then ran many concurrent requests with unique "id".
Each Task retained it so it looks promising!

@mdedetrich
Copy link
Contributor

Sorry for the late response, will check it out during the week!

@mdedetrich
Copy link
Contributor

mdedetrich commented Jul 25, 2020

Okay so I ran my tests against this snapshots and this is the problem that I am getting.

With mdedetrich/monix-mdc#2 the following test fails

[info] - can Write with Task and get in Future inside Future for comprehension *** FAILED ***
[info]   null was not equal to "NehdniintwruettEygwcikykrnlBhdrjtJvanudizpceYwkeslksynembyirajbwhwBhrtvtmsogxfilcyjgalyhcQztkwEPMpBa" (MDCFutureSpec.scala:50)

And with mdedetrich/monix-opentracing#2

FutureTaskTracingSchedulerSpec

Concurrently sets tags correctly with Task first
java.lang.NullPointerException was thrown.
java.lang.NullPointerException
	at FutureTaskTracingSchedulerSpec.$anonfun$new$21(FutureTaskTracingSchedulerSpec.scala:78)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at monix.execution.schedulers.TracingRunnable.run(TracingRunnable.scala:33)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

and

FutureTaskScalaConcurrentSpec

Concurrently sets tags correctly with Task first
java.lang.NullPointerException was thrown.
java.lang.NullPointerException
	at FutureTaskScalaConcurrentSpec.$anonfun$new$20(FutureTaskScalaConcurrentSpec.scala:84)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at monix.execution.internal.InterceptRunnable.run(InterceptRunnable.scala:27)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:14
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

So I think it may have fix some issues but created others?

Also my own mdedetrich/akka-monix-test#1 seems to be failing (the main difference between my own version and your version is that I am using MDC instead of Local directly but maybe there is some interaction I am not aware of?)

@Avasil
Copy link
Collaborator Author

Avasil commented Jul 25, 2020

Thanks, I have a limited access to the Internet right now but I will look at your test cases as soon as I'm able

@Avasil Avasil mentioned this pull request Aug 7, 2020
@alexandru alexandru changed the base branch from master to series/3.x August 11, 2020 18:14
@@ -31,7 +32,7 @@ import scala.util.control.NonFatal
/** Represents an asynchronous computation that can be canceled
* as long as it isn't complete.
*/
sealed abstract class CancelableFuture[+A] extends Future[A] with Cancelable { self =>
sealed abstract class CancelableFuture[+A](isolatedCtx: Local.Context) extends Future[A] with Cancelable { self =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary solution to see if it's worth pursuing - if it works, I'll hide everything

@Avasil
Copy link
Collaborator Author

Avasil commented Sep 1, 2020

@mdedetrich

I was pretty sure that my previous attempt was consistently passing tests but when I tried it now, it's not the case so it seems like I was deceived.

I tried a different angle that is available in: 3.2.2+49-d027e5f7-SNAPSHOT

I tested on your monix-mdc and akka-monix-test.
I haven't tried monix-opentracing yet, I will do it tomorrow.

There is at least one limitation:
It only works as long as we chain using CancelableFuture operators.

More precisely:

task.runToFuture.flatMap(_ => Future()).flatMap(_ => Future())

is OK

task.runToFuture.fast.flatMap(_ => Future()).flatMap(_ => Future())

is NOT OK because it wraps CancelableFuture and uses different implementations of flatMap / transform etc. that don't piggyback on CancelableFuture.

As a consequence, akka-monix-test needs to be modified to:

  def onCompleteLocal[T](task: => Task[T]): Directive1[Try[T]] = {
    Directive { inner => ctx =>
      task.runToFutureOpt.transformWith(t => inner(Tuple1(t))(ctx))
    }
  }

I feel like it could be OK in practice because we could provide these directives in monix-mdc / monix-opentracing and discourage direct usage of Local. Although I don't know how many Akka HTTP directives use FastFuture, I hope that's just onComplete.

There also shouldn't be Local.isolate there:

onCompleteLocal(Task {
  Local.isolate {
    MDC.put("local", id.toString)
  }
}) 

Task will be automatically isolated with runToFuture.
If we use Local.isolate for MDC.put only, it won't be observable for anyone.

@Avasil
Copy link
Collaborator Author

Avasil commented Sep 1, 2020

BTW I feel like monix-mdc could use a test along these lines:

    def getAndPutTask(key: String, value: String): Future[String] =
      for {
        _ <- Task {
              MDC.put(key, value)
            }.runToFutureOpt
        get <- Future {
                MDC.getCopyOfContextMap.size shouldBe 1
                MDC.get(key)
              }
      } yield get

    "Write with Task and get in Future inside Future for comprehension concurrently" in {
      val keyValues = MultipleKeysMultipleValues.multipleKeyValueGenerator.sample.get

      val futures = keyValues.keysAndValues.map { keyValue =>
        getAndPutTask(keyValue.key, keyValue.value)
      }

      val future = Future.sequence(futures)

      future.map { retrievedKeyValues =>
        retrievedKeyValues.size shouldBe keyValues.keysAndValues.size
        retrievedKeyValues shouldBe keyValues.keysAndValues.map(_.value)
      }
    }

Note MDC.getCopyOfContextMap.size shouldBe 1 that checks for isolation. If Task is not isolated, there would be more keys from unrelated requests.

@Avasil
Copy link
Collaborator Author

Avasil commented Sep 2, 2020

I have tried monix-opentracing and it looks good to me as well.

FutureTaskTracingSchedulerSpec passes without any changes

FutureTaskScalaConcurrentSpec passes after using TracingScheduler - I didn't look closely into it, but probably the correct context was lost in the Future chain.

Change was simple:

implicit val scheduler: Scheduler = TracingScheduler(AsyncScheduler(
  Scheduler.DefaultScheduledExecutor,
  new TracedExecutionContext(ExecutionContext.Implicits.global, tracer),
  ExecutionModel.Default,
  UncaughtExceptionReporter.default
))

If TracingScheduler is used, it is also not required to call runToFutureOpt with enableLocalContextPropagation - runToFuture is enough.

In the following days, I'll do more testing and try to implement it in a binary compatible way.

@mdedetrich
Copy link
Contributor

@Avasil

My only comment is on

task.runToFuture.fast.flatMap(_ => Future()).flatMap(_ => Future())

is NOT OK because it wraps CancelableFuture and uses different implementations of flatMap / transform etc. that don't piggyback on CancelableFuture.

I will have to look into this because akka-http uses fast future internally and because of this we may have issues achieving the ultimate goal (which is being able to pass locals around from routes down to business logic implemented in Monix).

Apart from that everything is looking good, let me know if you need anything else!

@Avasil
Copy link
Collaborator Author

Avasil commented Sep 8, 2020

Thanks @mdedetrich - I don't have access to any Akka HTTP codebase right now so I will appreciate investigation there!

if FastFuture is a concern then I'll try to figure something out :)

@mdedetrich
Copy link
Contributor

I will look at it mid next week, I have holidays then so plenty of time to look into it properly.

@Avasil Avasil mentioned this pull request Sep 13, 2020
@Avasil
Copy link
Collaborator Author

Avasil commented Oct 7, 2020

Any update @mdedetrich ?
I would also love any thoughts from @jvican , I think you mentioned that you need this feature for an internal library, not sure if that's still valid.

The latest snapshot is 3.2.2+49-d027e5f7-SNAPSHOT and I have an example here: https://github.com/Avasil/akka-monix-local-example/blob/master/src/main/scala/AkkaHTTPExample.scala

@mdedetrich
Copy link
Contributor

@Avasil Unfortunately I don't really have the time to test this properly and at the same time I don't want to block this so I think unless there is a strong reason otherwise it makes sense to make a release with this functionality and improve on it later.

I am however still curious as to why the FastFuture doesn't work. Afaik FastFuture basically just runs every computation on the current thread (essentially ignoring the ExecutionContext) however since it is executing on the same thread I am not sure how a Local would lose the context.

@Avasil
Copy link
Collaborator Author

Avasil commented Oct 28, 2020

@mdedetrich

I am however still curious as to why the FastFuture doesn't work. Afaik FastFuture basically just runs every computation on the current thread (essentially ignoring the ExecutionContext) however since it is executing on the same thread I am not sure how a Local would lose the context.

The current implementation works because runToFuture returns CancelableFuture and I modified it to keep track of Local. It sets the proper Local in transform(With) operations so the isolated Local is kept inside Future continuations but it doesn't leak outside.

FastFuture overwrites these implementations and often operates on the current Thread but we restore the original Local on the original `Thread.

If we keep isolated Local on the current Thread then this test would break:

testAsync("Task.eval.runToFuture is isolated from outside changes") {
  implicit val s: Scheduler = Scheduler.Implicits.traced

  val local = Local(0)

  val t1 = for {
    i1 <- Task(local.get)
    _ <- Task.sleep(10.millis)
    i2 <- Task(local.get)
  } yield assertEquals(i1, i2)

  val f = t1.runToFuture
  local.update(100)
  f
}

I'm experimenting with an alternative solution: instead of returning the original Local, we could return a copy of the isolated one.

The first issue that comes to my mind is that it is not consistent with other "run" operators, like runAsync, or runSyncUnsafe which fully isolate the Task.

@Avasil
Copy link
Collaborator Author

Avasil commented Oct 28, 2020

I'm experimenting with an alternative solution: instead of returning the original Local, we could return a copy of the isolated one.
The first issue that comes to my mind is that it is not consistent with other "run" operators, like runAsync, or runSyncUnsafe which fully isolate the Task.

Well, as I think about it, it seems more and more like a bad idea. If we leave the copy on the current thread, then each subsequent request will start with previous request's metadata unless it's overwritten/cleared.

I will just release the current state and think about it more later, or perhaps leave it as is depending on user's feedback.

@Avasil Avasil changed the title WIP: Propagate Local isolation in runToFuture Propagate Local isolation in runToFuture Oct 28, 2020
# Conflicts:
#	monix-eval/shared/src/main/scala/monix/eval/internal/TaskRunLoop.scala
#	project/MimaFilters.scala
@jvican
Copy link
Collaborator

jvican commented Oct 29, 2020

So if I understand the PR correctly, we currently have support for isolation when we use runToFuture thanks to the instrumentation of CancelablePromise. Looks like this already solves an important pain point, does it make sense that we proceed and merge as is and we try to tackle the problems with FastFuture in a follow-up PR?

@Avasil
Copy link
Collaborator Author

Avasil commented Oct 29, 2020

does it make sense that we proceed and merge as is and we try to tackle the problems with FastFuture in a follow-up PR?

Yes, I don't want to delay it any further and would love to include it in the release in a few days

@Avasil Avasil merged commit 7a258d8 into monix:series/3.x Oct 30, 2020
@Avasil Avasil deleted the runToFuture-isolation branch November 14, 2020 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

TracingScheduler issues when mixing Future with Task
4 participants