Skip to content

Commit

Permalink
Add Task.runSyncUnsafe (#547)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandru committed Jan 19, 2018
1 parent 88875c1 commit e6252c4
Show file tree
Hide file tree
Showing 9 changed files with 729 additions and 105 deletions.
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.eval.internal

import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.duration.Duration

private[eval] object TaskRunSyncUnsafe {
/** Implementation of `Task.runSyncUnsafe`, meant to throw an
* "unsupported exception", since JavaScript cannot support it.
*/
def apply[A](source: Task[A], timeout: Duration, scheduler: Scheduler, opts: Task.Options): A = {
// $COVERAGE-OFF$
throw new UnsupportedOperationException("runSyncUnsafe isn't supported on top of JavaScript")
// $COVERAGE-ON$
}
}
6 changes: 2 additions & 4 deletions monix-eval/jvm/src/main/scala/monix/eval/TaskApp.scala
Expand Up @@ -18,8 +18,7 @@
package monix.eval

import monix.execution.Scheduler
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration.Duration.Inf

/** Safe `App` type that runs a [[Task]] action.
*
Expand Down Expand Up @@ -56,7 +55,6 @@ trait TaskApp {
Coeval.evalOnce(Task.defaultOptions)

final def main(args: Array[String]): Unit = {
val f = run(args).runAsyncOpt(scheduler.value, options.value)
Await.result(f, Duration.Inf)
run(args).runSyncUnsafeOpt(Inf)(scheduler.value, options.value, implicitly)
}
}
@@ -0,0 +1,209 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.eval.internal

import java.util.concurrent.TimeoutException
import java.util.concurrent.locks.AbstractQueuedSynchronizer

import monix.eval.Task.{Async, Context, Error, Eval, FlatMap, Map, MemoizeSuspend, Now, Suspend}
import monix.eval.internal.TaskRunLoop._
import monix.eval.{Callback, Task}
import monix.execution.Scheduler
import monix.execution.internal.collection.ArrayStack
import monix.execution.misc.NonFatal

import scala.concurrent.blocking
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success}

private[eval] object TaskRunSyncUnsafe {
/** Run-loop specialization that evaluates the given task and blocks for the result
* if the given task is asynchronous.
*/
def apply[A](source: Task[A], timeout: Duration, scheduler: Scheduler, opts: Task.Options): A = {
var current = source.asInstanceOf[Task[Any]]
var bFirst: Bind = null
var bRest: CallStack = null
// Values from Now, Always and Once are unboxed in this var, for code reuse
var hasUnboxed: Boolean = false
var unboxed: AnyRef = null

do {
current match {
case FlatMap(fa, bindNext) =>
if (bFirst ne null) {
if (bRest eq null) bRest = new ArrayStack()
bRest.push(bFirst)
}
bFirst = bindNext
current = fa

case Now(value) =>
unboxed = value.asInstanceOf[AnyRef]
hasUnboxed = true

case Eval(thunk) =>
try {
unboxed = thunk().asInstanceOf[AnyRef]
hasUnboxed = true
} catch {
case e if NonFatal(e) =>
current = Error(e)
}

case bindNext @ Map(fa, _, _) =>
if (bFirst ne null) {
if (bRest eq null) bRest = new ArrayStack()
bRest.push(bFirst)
}
bFirst = bindNext
current = fa

case Suspend(thunk) =>
// Try/catch described as statement to prevent ObjectRef ;-)
try {
current = thunk()
}
catch {
case ex if NonFatal(ex) => current = Error(ex)
}

case Error(error) =>
findErrorHandler(bFirst, bRest) match {
case null => throw error
case bind =>
// Try/catch described as statement to prevent ObjectRef ;-)
try { current = bind.recover(error) }
catch { case e if NonFatal(e) => current = Error(e) }
bFirst = null
}

case Async(register) =>
return blockForResult(current, register, timeout, scheduler, opts, bFirst, bRest)

case ref: MemoizeSuspend[_] =>
// Already processed?
ref.value match {
case Some(materialized) =>
materialized match {
case Success(value) =>
unboxed = value.asInstanceOf[AnyRef]
hasUnboxed = true
case Failure(error) =>
current = Error(error)
}
case None =>
return blockForResult(current, null, timeout, scheduler, opts, bFirst, bRest)
}
}

if (hasUnboxed) {
popNextBind(bFirst, bRest) match {
case null =>
return unboxed.asInstanceOf[A]
case bind =>
// Try/catch described as statement to prevent ObjectRef ;-)
try {
current = bind(unboxed)
} catch {
case ex if NonFatal(ex) => current = Error(ex)
}
hasUnboxed = false
unboxed = null
bFirst = null
}
}
} while (true)
// $COVERAGE-OFF$
throw new IllegalStateException("out of loop")
// $COVERAGE-ON$
}

private def blockForResult[A](
source: Current,
register: (Context, Callback[Any]) => Unit = null,
limit: Duration,
scheduler: Scheduler,
opts: Task.Options,
bFirst: Bind,
bRest: CallStack): A = {

val latch = new OneShotLatch
val cb = new BlockingCallback[Any](latch)
val context = Context(scheduler, opts)

// Starting actual execution
if (register ne null) {
val rcb = new RestartCallback(context, cb)
executeAsyncTask(context, register, cb, rcb, bFirst, bRest, 1)
} else {
val fa = source.asInstanceOf[Task[A]]
startFull(fa, context, cb, null, bFirst, bRest, 1)
}

val isFinished = limit match {
case e if e eq Duration.Undefined =>
throw new IllegalArgumentException("Cannot wait for Undefined period")
case Duration.Inf =>
blocking(latch.acquireSharedInterruptibly(1))
true
case f: FiniteDuration if f > Duration.Zero =>
blocking(latch.tryAcquireSharedNanos(1, f.toNanos))
case _ =>
false
}

if (isFinished)
cb.value.asInstanceOf[A]
else
throw new TimeoutException(s"Task.runSyncUnsafe($limit)")
}

private final class BlockingCallback[A](latch: OneShotLatch)
extends Callback [A] {

private[this] var success: A = _
private[this] var error: Throwable = _

def value: A =
error match {
case null => success
case e => throw e
}

def onSuccess(value: A): Unit = {
success = value
latch.releaseShared(1)
}

def onError(ex: Throwable): Unit = {
error = ex
latch.releaseShared(1)
}
}

private final class OneShotLatch extends AbstractQueuedSynchronizer {
override protected def tryAcquireShared(ignored: Int): Int =
if (getState != 0) 1 else -1

override protected def tryReleaseShared(ignore: Int): Boolean = {
setState(1)
true
}
}
}
33 changes: 31 additions & 2 deletions monix-eval/jvm/src/test/scala/monix/eval/TaskBlockingSuite.scala
Expand Up @@ -19,11 +19,12 @@ package monix.eval

import minitest.SimpleTestSuite
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.Await

import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._

object TaskBlockingSuite extends SimpleTestSuite {
test("blocking should work") {
test("blocking on future should work") {
val source1 = Task(100)
val source2 = Task(200).onErrorHandleWith { case e: Exception => Task.raiseError(e) }

Expand All @@ -35,4 +36,32 @@ object TaskBlockingSuite extends SimpleTestSuite {
val result = Await.result(derived.runAsync, 10.seconds)
assertEquals(result, 300)
}

test("blocking on async") {
for (_ <- 0 until 1000) {
val task = Task(1)
assertEquals(task.runSyncUnsafe(Duration.Inf), 1)
}
}

test("blocking on async.flatMap") {
for (_ <- 0 until 1000) {
val task = Task(1).flatMap(_ => Task(2))
assertEquals(task.runSyncUnsafe(Duration.Inf), 2)
}
}

test("blocking on memoize") {
for (_ <- 0 until 1000) {
val task = Task(1).flatMap(_ => Task(2)).memoize
assertEquals(task.runSyncUnsafe(Duration.Inf), 2)
assertEquals(task.runSyncUnsafe(Duration.Inf), 2)
}
}

test("timeout exception") {
intercept[TimeoutException] {
Task.never.runSyncUnsafe(100.millis)
}
}
}
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.eval

import cats.Eq
import cats.effect.IO
import cats.effect.laws.discipline.{AsyncTests, EffectTests}
import cats.kernel.laws.discipline.MonoidTests
import cats.laws.discipline.{CoflatMapTests, ParallelTests}
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
import scala.util.Try

/** Type class tests for Task that use an alternative `Eq`, making
* use of Task's `runAsync(callback)`.
*/
object TypeClassLawsForTaskRunSyncUnsafeSuite extends monix.execution.BaseLawsSuite
with ArbitraryInstancesBase {

implicit def equalityTask[A](implicit A: Eq[A]): Eq[Task[A]] =
Eq.instance { (a, b) =>
val ta = Try(a.runSyncUnsafe(5.minutes))
val tb = Try(b.runSyncUnsafe(5.minutes))
equalityTry[A].eqv(ta, tb)
}

implicit def equalityIO[A](implicit A: Eq[A]): Eq[IO[A]] =
Eq.instance { (a, b) =>
val ta = Try(a.unsafeRunSync())
val tb = Try(b.unsafeRunSync())
equalityTry[A].eqv(ta, tb)
}

checkAll("CoflatMap[Task]",
CoflatMapTests[Task].coflatMap[Int,Int,Int])

checkAll("Async[Task]",
AsyncTests[Task].async[Int,Int,Int])

checkAll("Effect[Task]",
EffectTests[Task].effect[Int,Int,Int])

checkAll("Parallel[Task, Task]",
ParallelTests[Task, Task].parallel[Int, Int])

checkAll("Monoid[Task[Int]]",
MonoidTests[Task[Int]].monoid)
}

0 comments on commit e6252c4

Please sign in to comment.