diff --git a/.travis.yml b/.travis.yml index ba8f98815..775130940 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,14 +21,15 @@ matrix: - jdk: oraclejdk8 scala: 2.12.8 env: COMMAND=ci-js - # Scala 2.13, JVM - - jdk: oraclejdk8 - scala: 2.13.0-M5 - env: COMMAND=ci-jvm - # Scala 2.13, JavaScript - - jdk: oraclejdk8 - scala: 2.13.0-M5 - env: COMMAND=ci-js +# TODO: Upgrade to 2.13.0 when possible +# # Scala 2.13, JVM +# - jdk: oraclejdk8 +# scala: 2.13.0-M5 +# env: COMMAND=ci-jvm +# # Scala 2.13, JavaScript +# - jdk: oraclejdk8 +# scala: 2.13.0-M5 +# env: COMMAND=ci-js env: global: diff --git a/build.sbt b/build.sbt index 7cf5e1c3f..5c091e301 100644 --- a/build.sbt +++ b/build.sbt @@ -32,7 +32,8 @@ val catsEffectVersion = "1.4.0" val catsEffectLawsVersion = catsEffectVersion val jcToolsVersion = "2.1.2" val reactiveStreamsVersion = "1.0.2" -val minitestVersion = "2.3.2" +val minitestVersion = "2.6.0" +val implicitBoxVersion = "0.1.0" def scalaTestVersion(scalaVersion: String) = CrossVersion.partialVersion(scalaVersion) match { case Some((2, v)) if v >= 13 => "3.0.6-SNAP5" @@ -66,7 +67,7 @@ lazy val warnUnusedImport = Seq( lazy val sharedSettings = warnUnusedImport ++ Seq( organization := "io.monix", scalaVersion := "2.12.8", - crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.0-M5"), + crossScalaVersions := Seq("2.11.12", "2.12.8"), scalacOptions ++= Seq( // warnings @@ -397,7 +398,8 @@ lazy val coreJS = project.in(file("monix/js")) .settings(name := "monix") lazy val executionCommon = crossVersionSharedSources ++ Seq( - name := "monix-execution" + name := "monix-execution", + libraryDependencies += "io.monix" %%% "implicitbox" % implicitBoxVersion ) lazy val executionJVM = project.in(file("monix-execution/jvm")) diff --git a/monix-eval/shared/src/main/scala/monix/eval/Coeval.scala b/monix-eval/shared/src/main/scala/monix/eval/Coeval.scala index 27be617d7..c875b033a 100644 --- a/monix-eval/shared/src/main/scala/monix/eval/Coeval.scala +++ b/monix-eval/shared/src/main/scala/monix/eval/Coeval.scala @@ -27,6 +27,7 @@ import monix.execution.annotations.UnsafeBecauseImpure import monix.execution.compat.BuildFrom import monix.execution.compat.internal.newBuilder import monix.execution.internal.Platform.fusionMaxStackDepth + import scala.annotation.unchecked.{uncheckedVariance => uV} import scala.collection.mutable import scala.util.control.NonFatal diff --git a/monix-eval/shared/src/main/scala/monix/eval/Task.scala b/monix-eval/shared/src/main/scala/monix/eval/Task.scala index 135d2993d..5c0937b8c 100644 --- a/monix-eval/shared/src/main/scala/monix/eval/Task.scala +++ b/monix-eval/shared/src/main/scala/monix/eval/Task.scala @@ -581,7 +581,7 @@ sealed abstract class Task[+A] extends Serializable with TaskDeprecated.BinCompa def runToFutureOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] = { val opts2 = opts.withSchedulerFeatures Local - .bindCurrentAsyncIf(opts2.localContextPropagation) { + .bindCurrentIf(opts2.localContextPropagation) { TaskRunLoop.startFuture(this, s, opts2) } } diff --git a/monix-eval/shared/src/main/scala/monix/eval/TaskLocal.scala b/monix-eval/shared/src/main/scala/monix/eval/TaskLocal.scala index aaecd97fd..b0416c8c2 100644 --- a/monix-eval/shared/src/main/scala/monix/eval/TaskLocal.scala +++ b/monix-eval/shared/src/main/scala/monix/eval/TaskLocal.scala @@ -269,7 +269,7 @@ object TaskLocal { def isolate[A](task: Task[A]): Task[A] = checkPropagation { Task { val current = Local.getContext() - Local.setContext(current.mkIsolated) + Local.setContext(current.isolate()) current }.bracket(_ => task)(backup => Task(Local.setContext(backup))) } diff --git a/monix-execution/js/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala b/monix-execution/js/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala index 44c5e4ca9..d2c83ada5 100644 --- a/monix-execution/js/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala +++ b/monix-execution/js/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala @@ -18,7 +18,8 @@ package monix.execution.schedulers import monix.execution.internal.Trampoline -import scala.concurrent.ExecutionContext + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} /** A `scala.concurrentExecutionContext` implementation * that executes runnables immediately, on the current thread, @@ -50,7 +51,7 @@ import scala.concurrent.ExecutionContext * @param underlying is the `ExecutionContext` to which the it defers * to in case real asynchronous is needed */ -final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContext { +final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContextExecutor { private[this] val trampoline = new Trampoline(underlying) @@ -58,7 +59,6 @@ final class TrampolineExecutionContext private (underlying: ExecutionContext) ex trampoline.execute(runnable) override def reportFailure(t: Throwable): Unit = underlying.reportFailure(t) - } object TrampolineExecutionContext { diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala index 75093d2a3..907794db8 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala @@ -18,8 +18,9 @@ package monix.execution.schedulers import monix.execution.internal.Trampoline + import scala.util.control.NonFatal -import scala.concurrent.{BlockContext, CanAwait, ExecutionContext} +import scala.concurrent.{BlockContext, CanAwait, ExecutionContext, ExecutionContextExecutor} /** A `scala.concurrentExecutionContext` implementation * that executes runnables immediately, on the current thread, @@ -51,7 +52,7 @@ import scala.concurrent.{BlockContext, CanAwait, ExecutionContext} * @param underlying is the `ExecutionContext` to which the it defers * to in case real asynchronous is needed */ -final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContext { +final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContextExecutor { private[this] val trampoline = new ThreadLocal[Trampoline]() { diff --git a/monix-execution/jvm/src/test/scala/monix/execution/misc/LocalJVMSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/misc/LocalJVMSuite.scala index 5e7be9ae5..e4ee3aebf 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/misc/LocalJVMSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/misc/LocalJVMSuite.scala @@ -21,7 +21,6 @@ import minitest.SimpleTestSuite import monix.execution.{Cancelable, CancelableFuture, Scheduler} import monix.execution.exceptions.DummyException import monix.execution.schedulers.TracingScheduler - import scala.concurrent.Future object LocalJVMSuite extends SimpleTestSuite { @@ -43,6 +42,24 @@ object LocalJVMSuite extends SimpleTestSuite { for (v <- f) yield assertEquals(v, 50) } + testAsync("Local.isolate(CancelableFuture) should properly isolate during async boundaries") { + implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) + + val local = Local(0) + + val f = for { + _ <- CancelableFuture(Future { local := 50 }, Cancelable()) + _ <- Local.isolate { + CancelableFuture(Future { + local := 100 + }, Cancelable()) + } + v <- CancelableFuture(Future { local() }, Cancelable()) + } yield v + + for (v <- f) yield assertEquals(v, 50) + } + testAsync("Local.isolate should properly isolate during async boundaries on error") { implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) @@ -61,14 +78,14 @@ object LocalJVMSuite extends SimpleTestSuite { for (v <- f) yield assertEquals(v, 50) } - testAsync("Local.bindCurrentIf should properly restore context during async boundaries") { + testAsync("Local.bindCurrentIf(CancelableFuture) should properly restore context during async boundaries") { implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) val local = Local(0) val f = for { _ <- Future { local := 50 } - _ <- Local.bindCurrentAsyncIf(true)(CancelableFuture(Future { + _ <- Local.bindCurrentIf(true)(CancelableFuture(Future { local := 100 }, Cancelable.empty)) v <- Future { local() } @@ -77,6 +94,34 @@ object LocalJVMSuite extends SimpleTestSuite { for (v <- f) yield assertEquals(v, 50) } + testAsync("Local.bind(Local.defaultContext()) should restore context during async boundaries") { + implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) + + val local = Local(0) + + val f = for { + _ <- Future { local := 50 } + _ <- Local.bind(Local.newContext()) { Future { local := 100 } } + v <- Future { local() } + } yield v + + for (v <- f) yield assertEquals(v, 50) + } + + testAsync("Local.bindClear should restore context during async boundaries") { + implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) + + val local = Local(0) + + val f = for { + _ <- Future { local := 50 } + _ <- Local.bindClear { Future { local := 100 } } + v <- Future { local() } + } yield v + + for (v <- f) yield assertEquals(v, 50) + } + testAsync("local.bind should properly restore context during async boundaries") { implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) diff --git a/monix-execution/jvm/src/test/scala_2.12/monix/execution/CompletableFutureLocalSuite.scala b/monix-execution/jvm/src/test/scala_2.12/monix/execution/CompletableFutureLocalSuite.scala new file mode 100644 index 000000000..a8c51bb05 --- /dev/null +++ b/monix-execution/jvm/src/test/scala_2.12/monix/execution/CompletableFutureLocalSuite.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2014-2019 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.execution + +import java.util.concurrent.CompletableFuture +import java.util.function.{BiFunction, Supplier} + +import minitest.SimpleTestSuite +import monix.execution.misc.Local +import monix.execution.schedulers.TracingScheduler + +object CompletableFutureLocalSuite extends SimpleTestSuite { + testAsync("Local.isolate(CompletableFuture) should properly isolate during async boundaries") { + implicit val s = TracingScheduler(Scheduler.singleThread("local-test")) + + val local = Local(0) + + val cf = CompletableFuture + .supplyAsync(new Supplier[Any] { + override def get(): Any = local := 50 + }, s) + + val cf2 = + Local.isolate { + cf.handleAsync(new BiFunction[Any, Throwable, Any] { + def apply(r: Any, error: Throwable): Any = { + local := 100 + } + }, s) + }.handleAsync(new BiFunction[Any, Throwable, Any] { + def apply(r: Any, error: Throwable): Any = { + local() + } + }, s) + + for (v <- FutureUtils.fromJavaCompletable(cf2)) yield assertEquals(v.asInstanceOf[Int], 50) + } +} diff --git a/monix-execution/shared/src/main/scala/monix/execution/misc/CanBindLocals.scala b/monix-execution/shared/src/main/scala/monix/execution/misc/CanBindLocals.scala new file mode 100644 index 000000000..470714cd4 --- /dev/null +++ b/monix-execution/shared/src/main/scala/monix/execution/misc/CanBindLocals.scala @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2014-2019 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.execution.misc + +import java.util.concurrent.CompletableFuture +import java.util.function.BiFunction + +import implicitbox.Not +import monix.execution.{CancelableFuture, FutureUtils} +import monix.execution.schedulers.TrampolineExecutionContext + +import scala.annotation.implicitNotFound +import scala.concurrent.Future + +/** + * Type class describing how [[Local]] binding works for specific data types. + * + * This is needed because asynchronous data types, like `Future`, + * that can be waited on, should also clear the modified context + * after completion. + * + * NOTE: this type class does not work for data types that suspend the + * execution, like `Coeval` or `Task`, because [[Local]] is meant to + * be used in a side effectful way. Instances of this type class + * can't be implemented for data types like `Task`, as a technical + * limitation, because `Task` would also need a suspended `Context` + * evaluation in `bindContext`. + */ +@implicitNotFound("""Cannot find an implicit value for CanBindLocals[${R}]. +If ${R} is the result of a synchronous action, either build an implicit with +CanBindLocals.synchronous or import CanBindLocals.Implicits.synchronousAsDefault.""") +trait CanBindLocals[R] { + /** See [[monix.execution.misc.Local.bind[R](ctx* Local.bind]]. */ + def bindContext(ctx: Local.Context)(f: => R): R + + /** See [[monix.execution.misc.Local.bind[R](value* Local.bind]]. */ + def bindKey[A](local: Local[A], value: Option[A])(f: => R): R = + bindContext(Local.getContext().bind(local.key, value))(f) + + /** See [[Local.isolate]]. */ + def isolate(f: => R): R = + bindContext(Local.getContext().isolate())(f) +} + +object CanBindLocals extends CanIsolateInstancesLevel1 { + def apply[R](implicit R: CanBindLocals[R]): CanBindLocals[R] = R +} + +private[misc] abstract class CanIsolateInstancesLevel1 extends CanIsolateInstancesLevel0 { + /** + * Instance for `monix.execution.CancelableFuture`. + */ + implicit def cancelableFuture[R]: CanBindLocals[CancelableFuture[R]] = + CancelableFutureInstance.asInstanceOf[CanBindLocals[CancelableFuture[R]]] + + /** + * Instance for `java.util.concurrent.CompletableFuture`. + */ + implicit def completableFuture[R]: CanBindLocals[CompletableFuture[R]] = + CompletableFutureInstance.asInstanceOf[CanBindLocals[CompletableFuture[R]]] + + object Implicits { + /** + * Implicit instance for all things synchronous. + * + * Needs to be imported explicitly in scope. Will NOT override + * other `CanBindLocals` implicits that are already visible. + */ + @inline implicit def synchronousAsDefault[R](implicit ev: Not[CanBindLocals[R]]): CanBindLocals[R] = + CanBindLocals.synchronous[R] + } +} + +private[misc] abstract class CanIsolateInstancesLevel0 { + /** + * Instance for `scala.concurrent.Future`. + */ + implicit def future[R]: CanBindLocals[Future[R]] = + FutureInstance.asInstanceOf[CanBindLocals[Future[R]]] + + /** + * Instance for `Unit`. + */ + @inline implicit def forUnit: CanBindLocals[Unit] = + synchronous[Unit] + + /** + * Builds an instance for synchronous execution. + * + * {{{ + * import monix.execution.misc._ + * implicit val ev = CanBindLocals.synchronous[String] + * + * // If not provided explicitly, it might trigger compilation error + * // due to requirement for CanBindLocals[String] + * Local.bindClear { + * "Hello!" + * } + * }}} + */ + def synchronous[R]: CanBindLocals[R] = + SynchronousInstance.asInstanceOf[CanBindLocals[R]] + + /** Implementation for [[CanBindLocals.synchronous]]. */ + protected object SynchronousInstance extends CanBindLocals[Any] { + override def bindContext(ctx: Local.Context)(f: => Any): Any = { + val prev = Local.getContext() + Local.setContext(ctx) + try f + finally Local.setContext(prev) + } + } + + /** Implementation for [[CanBindLocals.cancelableFuture]]. */ + protected object CancelableFutureInstance extends CanBindLocals[CancelableFuture[Any]] { + override def bindContext(ctx: Local.Context)(f: => CancelableFuture[Any]): CancelableFuture[Any] = { + val prev = Local.getContext() + Local.setContext(ctx) + + try { + f.transform { result => + Local.setContext(prev) + result + }(TrampolineExecutionContext.immediate) + } finally { + Local.setContext(prev) + } + } + } + + /** Implementation for [[CanBindLocals.future]]. */ + protected object FutureInstance extends CanBindLocals[Future[Any]] { + override def bindContext(ctx: Local.Context)(f: => Future[Any]): Future[Any] = { + val prev = Local.getContext() + Local.setContext(ctx) + + try { + FutureUtils + .transform[Any, Any](f, result => { + Local.setContext(prev) + result + })(TrampolineExecutionContext.immediate) + } finally { + Local.setContext(prev) + } + } + } + + /** Implementation for [[CanBindLocals.completableFuture]]. */ + protected object CompletableFutureInstance extends CanBindLocals[CompletableFuture[Any]] { + override def bindContext(ctx: Local.Context)(f: => CompletableFuture[Any]): CompletableFuture[Any] = { + val prev = Local.getContext() + Local.setContext(ctx) + + try { + f.handleAsync( + new BiFunction[Any, Throwable, Any] { + def apply(r: Any, error: Throwable): Any = { + Local.setContext(prev) + if (error != null) throw error + else r + } + }, + TrampolineExecutionContext.immediate + ) + } finally { + Local.setContext(prev) + } + } + } +} diff --git a/monix-execution/shared/src/main/scala/monix/execution/misc/Local.scala b/monix-execution/shared/src/main/scala/monix/execution/misc/Local.scala index 87fcc64ed..ab1dc544d 100644 --- a/monix-execution/shared/src/main/scala/monix/execution/misc/Local.scala +++ b/monix-execution/shared/src/main/scala/monix/execution/misc/Local.scala @@ -17,15 +17,25 @@ package monix.execution.misc -import monix.execution.{CancelableFuture, FutureUtils} import monix.execution.atomic.AtomicAny -import monix.execution.schedulers.TrampolineExecutionContext import scala.annotation.tailrec -import scala.concurrent.Future import scala.reflect.macros.whitebox -import scala.util.Try -object Local { +/** + * @define canBindLocalsDesc The implementation uses the [[CanBindLocals]] + * type class because in case of asynchronous data types that + * should be waited on, like `Future` or `CompletableFuture`, + * then the locals context also needs to be cleared on the + * future's completion, for correctness. + * + * There's no default instance for synchronous actions available + * in scope. If you need to work with synchronous actions, you + * need to import it explicitly: + * {{{ + * import monix.execution.misc.CanBindLocals.Implicits.synchronousAsDefault + * }}} + */ +object Local extends LocalCompanionDeprecated { /** Builds a new [[Local]] with the given `default` to be returned * if a value hasn't been set, or after the local gets cleared. * @@ -46,11 +56,14 @@ object Local { /** Internal — key type used in [[Context]]. */ final class Key extends Serializable - def defaultContext(): Local.Context = new Unbound(AtomicAny(Map())) + /** + * Creates a new, empty [[Context]]. + */ + def newContext(): Context = new Unbound(AtomicAny(Map())) /** Current [[Context]] kept in a `ThreadLocal`. */ private[this] val localContext: ThreadLocal[Context] = - ThreadLocal(defaultContext()) + ThreadLocal(newContext()) /** Return the state of the current Local state. */ def getContext(): Context = @@ -62,57 +75,39 @@ object Local { /** Clear the Local state. */ def clearContext(): Unit = - localContext.set(defaultContext()) - - /** Execute a synchronous block of code without propagating any `Local.Context` - * changes outside. - */ - def isolate[R](f: => R): R = - macro Macros.isolate + localContext.set(newContext()) - /** Execute an asynchronous block of code without propagating any `Local.Context` + /** Execute a block of code without propagating any `Local.Context` * changes outside. + * + * $canBindLocalsDesc */ - def isolate[R](f: => Future[R]): Future[R] = { - // TODO: rewrite as macro - val prev = Local.getContext() - val current = Local.getContext().mkIsolated - Local.setContext(current) - - try { - FutureUtils.transform[R, R](f, result => { - Local.setContext(prev) - result - })(TrampolineExecutionContext.immediate) - } finally { - Local.setContext(prev) - } - } + def isolate[R](f: => R)(implicit R: CanBindLocals[R]): R = + R.isolate(f) /** Execute a block of code using the specified state of * `Local.Context` and restore the current state when complete. + * + * $canBindLocalsDesc */ - def bind[R](ctx: Context)(f: => R): R = - macro Macros.localLet + def bind[R](ctx: Context)(f: => R)(implicit R: CanBindLocals[R]): R = + R.bindContext(ctx)(f) /** Execute a block of code with a clear state of `Local.Context` * and restore the current state when complete. + * + * $canBindLocalsDesc */ - def bindClear[R](f: => R): R = - macro Macros.localLetClear + def bindClear[R](f: => R)(implicit R: CanBindLocals[R]): R = + CanBindLocals[R].bindContext(Local.newContext())(f) /** Convert a closure `() => R` into another closure of the same * type whose [[Local.Context]] is saved when calling closed * and restored upon invocation. */ - def closed[R](fn: () => R): () => R = { + def closed[R](fn: () => R)(implicit R: CanBindLocals[R]): () => R = { val closure = Local.getContext() - () => { - val save = Local.getContext() - Local.setContext(closure) - try fn() - finally Local.setContext(save) - } + () => Local.bind(closure)(fn()) } private def getKey[A](key: Key): Option[A] = @@ -142,65 +137,30 @@ object Local { private[monix] def bindCurrentIf[R](b: Boolean)(f: => R): R = macro Macros.localLetCurrentIf - /** If `b` evaluates to `true`, execute an asynchronous block of code using a current - * state of `Local.Context` and restore the current state when complete. - */ - private[monix] def bindCurrentAsyncIf[R](b: Boolean)(f: => CancelableFuture[R]): CancelableFuture[R] = { - if (b) { - // inlined implementation of `isolate` for `CancelableFuture` - val prev = Local.getContext() - val current = Local.getContext().mkIsolated - Local.setContext(current) - - try { - f.transform(result => { - Local.setContext(prev) - result - })(TrampolineExecutionContext.immediate) - } finally { - Local.setContext(prev) - } - } else f - } - /** Macros implementations for [[bind]] and [[bindClear]]. */ private class Macros(override val c: whitebox.Context) extends InlineMacros with HygieneUtilMacros { import c.universe._ - def localLet(ctx: Tree)(f: Tree): Tree = { - // TODO - reduce copy-paste in localLetXXX macros - val ctxRef = util.name("ctx") - val saved = util.name("saved") - val Local = symbolOf[Local[_]].companion - val AnyRefSym = symbolOf[AnyRef] - - resetTree(q""" - val $ctxRef = ($ctx) - if (($ctxRef : $AnyRefSym) eq null) { - $f - } else { - val $saved = $Local.getContext() - $Local.setContext($ctxRef) - try { $f } finally { $Local.setContext($saved) } - } - """) - } - - def localLetClear(f: Tree): Tree = { - val Local = symbolOf[Local[_]].companion - localLet(q"$Local.defaultContext()")(f) - } - + def localLet(ctx: Tree)(f: Tree): Tree = + c.abort(c.macroApplication.pos, "Macro no longer implemented!") + def localLetClear(f: Tree): Tree = + c.abort(c.macroApplication.pos, "Macro no longer implemented!") def isolate(f: Tree): Tree = - localLet(q"${symbolOf[Local[_]].companion}.getContext().mkIsolated")(f) + c.abort(c.macroApplication.pos, "Macro no longer implemented!") def localLetCurrentIf(b: Tree)(f: Tree): Tree = { - resetTree(q""" - if (!$b) { $f } - else ${isolate(f)} - """) + val Local = symbolOf[Local[_]].companion + val CanBindLocals = symbolOf[CanBindLocals[_]].companion + + resetTree( + q"""if (!$b) { $f } else { + import $CanBindLocals.Implicits.synchronousAsDefault + $Local.isolate($f) + }""" + ) } } + /** Represents the current state of all [[Local locals]] for a given * execution context. * @@ -243,7 +203,23 @@ object Local { r } - final def mkIsolated: Context = { + final def bind(key: Key, value: Option[Any]): Context = + new Bound(key, value.orNull, value.isDefined, this) + + final def isolate(): Context = + isolateLoop() + + /** + * DEPRECATED — renamed to [[isolate]]. + */ + @deprecated("Renamed to isolate()", "3.0.0") + private[misc] def mkIsolated(): Unbound = { + // $COVERAGE-OFF$ + isolateLoop() + // $COVERAGE-ON$ + } + + private[this] final def isolateLoop(): Unbound = this match { case unbound: Unbound => val map = unbound.ref.get() @@ -271,14 +247,11 @@ object Local { } new Unbound(AtomicAny(map)) } - } - - final def bind(key: Key, value: Option[Any]): Context = - new Bound(key, value.orNull, value.isDefined, this) } - private final class Unbound(val ref: AtomicAny[Map[Key, Any]]) extends Context - private final class Bound( + private[execution] final class Unbound(val ref: AtomicAny[Map[Key, Any]]) extends Context + + private[execution] final class Bound( val key: Key, @volatile var value: Any, @volatile var hasValue: Boolean, @@ -297,8 +270,14 @@ object Local { * * Note: the implementation is optimized for situations in which save * and restore optimizations are dominant. + * + * @define canBindLocalsDesc The implementation uses the [[CanBindLocals]] + * type class because in case of asynchronous data types that + * should be waited on, like `Future` or `CompletableFuture`, + * then the locals context also needs to be cleared on the + * future's completion, for correctness. */ -final class Local[A](default: () => A) { +final class Local[A](default: () => A) extends LocalDeprecated[A] { import Local.Key val key: Key = new Key @@ -334,48 +313,19 @@ final class Local[A](default: () => A) { /** Execute a block with a specific local value, restoring the * current state upon completion. + * + * $canBindLocalsDesc */ - def bind[R](value: A)(f: => R): R = { - val parent = Local.getContext() - Local.setContext(parent.bind(key, Some(value))) - - f match { - case future: Future[_] => - FutureUtils - .transform(future, (result: Try[_]) => { - Local.setContext(parent) - result - })(TrampolineExecutionContext.immediate) - .asInstanceOf[R] - - case _ => - try f - finally Local.setContext(parent) - - } - } + def bind[R](value: A)(f: => R)(implicit R: CanBindLocals[R]): R = + R.bindKey(this, Some(value))(f) /** Execute a block with the `Local` cleared, restoring the current * state upon completion. + * + * $canBindLocalsDesc */ - def bindClear[R](f: => R): R = { - val parent = Local.getContext() - Local.setContext(parent.bind(key, None)) - - f match { - case future: Future[_] => - FutureUtils - .transform(future, (result: Try[_]) => { - Local.setContext(parent) - result - })(TrampolineExecutionContext.immediate) - .asInstanceOf[R] - - case _ => - try f - finally Local.setContext(parent) - } - } + def bindClear[R](f: => R)(implicit R: CanBindLocals[R]): R = + R.bindKey(this, None)(f) /** Clear the Local's value. Other [[Local Locals]] are not modified. * diff --git a/monix-execution/shared/src/main/scala/monix/execution/misc/LocalDeprecated.scala b/monix-execution/shared/src/main/scala/monix/execution/misc/LocalDeprecated.scala new file mode 100644 index 000000000..c4f695143 --- /dev/null +++ b/monix-execution/shared/src/main/scala/monix/execution/misc/LocalDeprecated.scala @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2014-2019 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.execution.misc + +import monix.execution.atomic.AtomicAny + +private[execution] trait LocalDeprecated[A] { self: Local[A] => + /** + * DEPRECATED — switch to `local.bind[R: CanIsolate]`. + */ + @deprecated("Switch to local.bind[R: CanIsolate]", since = "3.0.0") + private[misc] def bind[R](value: A)(f: => R): R = { + // $COVERAGE-OFF$ + CanBindLocals.synchronous[R].bindKey(self, Some(value))(f) + // $COVERAGE-ON$ + } + + /** + * DEPRECATED — switch to `local.bindClear[R: CanIsolate]`. + */ + @deprecated("Switch to local.bindClear[R: CanIsolate]", since = "3.0.0") + private[misc] def bindClear[R](f: => R): R = { + // $COVERAGE-OFF$ + val parent = Local.getContext() + CanBindLocals.synchronous[R].bindContext(parent.bind(key, None))(f) + // $COVERAGE-ON$ + } +} + +private[execution] trait LocalCompanionDeprecated { self: Local.type => + /** + * DEPRECATED — switch to [[Local.newContext]]. + */ + @deprecated("Renamed to Local.newContext", "3.0.0") + def defaultContext(): Local.Unbound = { + // $COVERAGE-OFF$ + new Unbound(AtomicAny(Map.empty)) + // $COVERAGE-ON$ + } + + /** + * DEPRECATED — switch to `local.closed[R: CanIsolate]`. + */ + @deprecated("Switch to local.closed[R: CanIsolate]", since = "3.0.0") + def closed[R](fn: () => R): () => R = { + // $COVERAGE-OFF$ + import CanBindLocals.Implicits.synchronousAsDefault + Local.closed(fn)(implicitly[CanBindLocals[R]]) + // $COVERAGE-ON$ + } +} diff --git a/monix-execution/shared/src/test/scala/monix/execution/misc/CanBindLocalsSuite.scala b/monix-execution/shared/src/test/scala/monix/execution/misc/CanBindLocalsSuite.scala new file mode 100644 index 000000000..400a8c13a --- /dev/null +++ b/monix-execution/shared/src/test/scala/monix/execution/misc/CanBindLocalsSuite.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2014-2019 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.execution.misc + +import minitest.SimpleTestSuite +import monix.execution.CancelableFuture + +import scala.concurrent.Future + +object CanBindLocalsSuite extends SimpleTestSuite { + class MySimpleType + class MyType[A] + + test("default implicits") { + val ev1 = implicitly[CanBindLocals[CancelableFuture[Int]]] + val ev2 = implicitly[CanBindLocals[Future[Int]]] + val ev3 = implicitly[CanBindLocals[Unit]] + + assert(ev1 != ev2, "ev1 != ev2") + assert(ev1.asInstanceOf[Any] != ev3.asInstanceOf[Any], "ev1 != ev3") + + assertDoesNotCompile("implicitly[CanBindLocals[MySimpleType]]", "Cannot find an implicit.*") + assertDoesNotCompile("implicitly[CanBindLocals[MyType[String]]]", "Cannot find an implicit.*") + assertDoesNotCompile("implicitly[CanBindLocals[Int]]", "Cannot find an implicit.*") + } + + test("import CanBindLocals.Implicits.synchronousAsDefault") { + import CanBindLocals.Implicits.synchronousAsDefault + + val ev1 = implicitly[CanBindLocals[MySimpleType]] + val ev2 = implicitly[CanBindLocals[MyType[String]]] + val ev3 = implicitly[CanBindLocals[CancelableFuture[Int]]] + val ev4 = implicitly[CanBindLocals[Future[Int]]] + val ev5 = implicitly[CanBindLocals[Unit]] + val ev6 = implicitly[CanBindLocals[Int]] + + assertEquals(ev1, ev2) + assertEquals(ev1, ev5) + assertEquals(ev1, ev6) + + assert(ev1.asInstanceOf[Any] != ev3.asInstanceOf[Any], "ev1 != ev3") + assert(ev1.asInstanceOf[Any] != ev4.asInstanceOf[Any], "ev1 != ev3") + } +} diff --git a/monix-execution/shared/src/test/scala/monix/execution/misc/LocalSuite.scala b/monix-execution/shared/src/test/scala/monix/execution/misc/LocalSuite.scala index 2ecddb881..45485c3df 100644 --- a/monix-execution/shared/src/test/scala/monix/execution/misc/LocalSuite.scala +++ b/monix-execution/shared/src/test/scala/monix/execution/misc/LocalSuite.scala @@ -19,10 +19,8 @@ package monix.execution.misc import cats.Eval import minitest.SimpleTestSuite -import monix.execution.Scheduler -import monix.execution.exceptions.DummyException import monix.execution.schedulers.{TestScheduler, TracingScheduler} - +import monix.execution.misc.CanBindLocals.Implicits.synchronousAsDefault import scala.concurrent.Future import scala.util.Success @@ -75,8 +73,8 @@ object LocalSuite extends SimpleTestSuite { local2 := 100 val f = Local.isolate { + local1 := 100 Future { - local1 := 100 local1.get + local2.get } } diff --git a/project/MimaFilters.scala b/project/MimaFilters.scala index 050efaedf..c48b34a0f 100644 --- a/project/MimaFilters.scala +++ b/project/MimaFilters.scala @@ -7,11 +7,8 @@ object MimaFilters { // Should not be a problem, but I'm not absolutely sure exclude[MissingTypesProblem]("monix.execution.exceptions.APIContractViolationException"), // Breaking changes for https://github.com/monix/monix/pull/960 + // Should only be a problem for Scala 2.11 exclude[ReversedMissingMethodProblem]("monix.execution.Scheduler.features"), - // Local changes :-( - exclude[IncompatibleResultTypeProblem]("monix.execution.misc.Local.defaultContext"), - exclude[IncompatibleResultTypeProblem]("monix.execution.misc.Local.defaultContext"), - exclude[IncompatibleResultTypeProblem]("monix.execution.misc.Local#Context.mkIsolated"), // Internals exclude[MissingClassProblem]("monix.eval.Task$DoOnFinish"), exclude[MissingClassProblem]("monix.eval.internal.TaskConnection$TrampolinedWithConn"),