Skip to content

Commit

Permalink
Use type classes instead of overloads in Local (#977)
Browse files Browse the repository at this point in the history
* Protect from error in Local.bind

* Use type class instead of overloads and pattern matching

* add mima filters

* scalafmt

* fix compilation error in Task

* Fix binary incompatibilities

* Local proposal

* Add more tests

* Move CompletableFuture tests to scala_2.12 suites

* Remove Task/Coeval instances

* Default implicit for CanBindLocals

* Fix doc

* Fix comment

* Disable 2.13.0-M5
  • Loading branch information
Avasil committed Aug 20, 2019
1 parent d61f2fe commit d590bd5
Show file tree
Hide file tree
Showing 15 changed files with 524 additions and 165 deletions.
17 changes: 9 additions & 8 deletions .travis.yml
Expand Up @@ -21,14 +21,15 @@ matrix:
- jdk: oraclejdk8
scala: 2.12.8
env: COMMAND=ci-js
# Scala 2.13, JVM
- jdk: oraclejdk8
scala: 2.13.0-M5
env: COMMAND=ci-jvm
# Scala 2.13, JavaScript
- jdk: oraclejdk8
scala: 2.13.0-M5
env: COMMAND=ci-js
# TODO: Upgrade to 2.13.0 when possible
# # Scala 2.13, JVM
# - jdk: oraclejdk8
# scala: 2.13.0-M5
# env: COMMAND=ci-jvm
# # Scala 2.13, JavaScript
# - jdk: oraclejdk8
# scala: 2.13.0-M5
# env: COMMAND=ci-js

env:
global:
Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Expand Up @@ -32,7 +32,8 @@ val catsEffectVersion = "1.4.0"
val catsEffectLawsVersion = catsEffectVersion
val jcToolsVersion = "2.1.2"
val reactiveStreamsVersion = "1.0.2"
val minitestVersion = "2.3.2"
val minitestVersion = "2.6.0"
val implicitBoxVersion = "0.1.0"

def scalaTestVersion(scalaVersion: String) = CrossVersion.partialVersion(scalaVersion) match {
case Some((2, v)) if v >= 13 => "3.0.6-SNAP5"
Expand Down Expand Up @@ -66,7 +67,7 @@ lazy val warnUnusedImport = Seq(
lazy val sharedSettings = warnUnusedImport ++ Seq(
organization := "io.monix",
scalaVersion := "2.12.8",
crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.0-M5"),
crossScalaVersions := Seq("2.11.12", "2.12.8"),

scalacOptions ++= Seq(
// warnings
Expand Down Expand Up @@ -397,7 +398,8 @@ lazy val coreJS = project.in(file("monix/js"))
.settings(name := "monix")

lazy val executionCommon = crossVersionSharedSources ++ Seq(
name := "monix-execution"
name := "monix-execution",
libraryDependencies += "io.monix" %%% "implicitbox" % implicitBoxVersion
)

lazy val executionJVM = project.in(file("monix-execution/jvm"))
Expand Down
1 change: 1 addition & 0 deletions monix-eval/shared/src/main/scala/monix/eval/Coeval.scala
Expand Up @@ -27,6 +27,7 @@ import monix.execution.annotations.UnsafeBecauseImpure
import monix.execution.compat.BuildFrom
import monix.execution.compat.internal.newBuilder
import monix.execution.internal.Platform.fusionMaxStackDepth

import scala.annotation.unchecked.{uncheckedVariance => uV}
import scala.collection.mutable
import scala.util.control.NonFatal
Expand Down
2 changes: 1 addition & 1 deletion monix-eval/shared/src/main/scala/monix/eval/Task.scala
Expand Up @@ -581,7 +581,7 @@ sealed abstract class Task[+A] extends Serializable with TaskDeprecated.BinCompa
def runToFutureOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] = {
val opts2 = opts.withSchedulerFeatures
Local
.bindCurrentAsyncIf(opts2.localContextPropagation) {
.bindCurrentIf(opts2.localContextPropagation) {
TaskRunLoop.startFuture(this, s, opts2)
}
}
Expand Down
Expand Up @@ -269,7 +269,7 @@ object TaskLocal {
def isolate[A](task: Task[A]): Task[A] = checkPropagation {
Task {
val current = Local.getContext()
Local.setContext(current.mkIsolated)
Local.setContext(current.isolate())
current
}.bracket(_ => task)(backup => Task(Local.setContext(backup)))
}
Expand Down
Expand Up @@ -18,7 +18,8 @@
package monix.execution.schedulers

import monix.execution.internal.Trampoline
import scala.concurrent.ExecutionContext

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

/** A `scala.concurrentExecutionContext` implementation
* that executes runnables immediately, on the current thread,
Expand Down Expand Up @@ -50,15 +51,14 @@ import scala.concurrent.ExecutionContext
* @param underlying is the `ExecutionContext` to which the it defers
* to in case real asynchronous is needed
*/
final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContext {
final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContextExecutor {

private[this] val trampoline = new Trampoline(underlying)

override def execute(runnable: Runnable): Unit =
trampoline.execute(runnable)
override def reportFailure(t: Throwable): Unit =
underlying.reportFailure(t)

}

object TrampolineExecutionContext {
Expand Down
Expand Up @@ -18,8 +18,9 @@
package monix.execution.schedulers

import monix.execution.internal.Trampoline

import scala.util.control.NonFatal
import scala.concurrent.{BlockContext, CanAwait, ExecutionContext}
import scala.concurrent.{BlockContext, CanAwait, ExecutionContext, ExecutionContextExecutor}

/** A `scala.concurrentExecutionContext` implementation
* that executes runnables immediately, on the current thread,
Expand Down Expand Up @@ -51,7 +52,7 @@ import scala.concurrent.{BlockContext, CanAwait, ExecutionContext}
* @param underlying is the `ExecutionContext` to which the it defers
* to in case real asynchronous is needed
*/
final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContext {
final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContextExecutor {

private[this] val trampoline =
new ThreadLocal[Trampoline]() {
Expand Down
Expand Up @@ -21,7 +21,6 @@ import minitest.SimpleTestSuite
import monix.execution.{Cancelable, CancelableFuture, Scheduler}
import monix.execution.exceptions.DummyException
import monix.execution.schedulers.TracingScheduler

import scala.concurrent.Future

object LocalJVMSuite extends SimpleTestSuite {
Expand All @@ -43,6 +42,24 @@ object LocalJVMSuite extends SimpleTestSuite {
for (v <- f) yield assertEquals(v, 50)
}

testAsync("Local.isolate(CancelableFuture) should properly isolate during async boundaries") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

val local = Local(0)

val f = for {
_ <- CancelableFuture(Future { local := 50 }, Cancelable())
_ <- Local.isolate {
CancelableFuture(Future {
local := 100
}, Cancelable())
}
v <- CancelableFuture(Future { local() }, Cancelable())
} yield v

for (v <- f) yield assertEquals(v, 50)
}

testAsync("Local.isolate should properly isolate during async boundaries on error") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

Expand All @@ -61,14 +78,14 @@ object LocalJVMSuite extends SimpleTestSuite {
for (v <- f) yield assertEquals(v, 50)
}

testAsync("Local.bindCurrentIf should properly restore context during async boundaries") {
testAsync("Local.bindCurrentIf(CancelableFuture) should properly restore context during async boundaries") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

val local = Local(0)

val f = for {
_ <- Future { local := 50 }
_ <- Local.bindCurrentAsyncIf(true)(CancelableFuture(Future {
_ <- Local.bindCurrentIf(true)(CancelableFuture(Future {
local := 100
}, Cancelable.empty))
v <- Future { local() }
Expand All @@ -77,6 +94,34 @@ object LocalJVMSuite extends SimpleTestSuite {
for (v <- f) yield assertEquals(v, 50)
}

testAsync("Local.bind(Local.defaultContext()) should restore context during async boundaries") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

val local = Local(0)

val f = for {
_ <- Future { local := 50 }
_ <- Local.bind(Local.newContext()) { Future { local := 100 } }
v <- Future { local() }
} yield v

for (v <- f) yield assertEquals(v, 50)
}

testAsync("Local.bindClear should restore context during async boundaries") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

val local = Local(0)

val f = for {
_ <- Future { local := 50 }
_ <- Local.bindClear { Future { local := 100 } }
v <- Future { local() }
} yield v

for (v <- f) yield assertEquals(v, 50)
}

testAsync("local.bind should properly restore context during async boundaries") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.execution

import java.util.concurrent.CompletableFuture
import java.util.function.{BiFunction, Supplier}

import minitest.SimpleTestSuite
import monix.execution.misc.Local
import monix.execution.schedulers.TracingScheduler

object CompletableFutureLocalSuite extends SimpleTestSuite {
testAsync("Local.isolate(CompletableFuture) should properly isolate during async boundaries") {
implicit val s = TracingScheduler(Scheduler.singleThread("local-test"))

val local = Local(0)

val cf = CompletableFuture
.supplyAsync(new Supplier[Any] {
override def get(): Any = local := 50
}, s)

val cf2 =
Local.isolate {
cf.handleAsync(new BiFunction[Any, Throwable, Any] {
def apply(r: Any, error: Throwable): Any = {
local := 100
}
}, s)
}.handleAsync(new BiFunction[Any, Throwable, Any] {
def apply(r: Any, error: Throwable): Any = {
local()
}
}, s)

for (v <- FutureUtils.fromJavaCompletable(cf2)) yield assertEquals(v.asInstanceOf[Int], 50)
}
}

0 comments on commit d590bd5

Please sign in to comment.