-
Notifications
You must be signed in to change notification settings - Fork 506
/
LTask.scala
97 lines (85 loc) · 3.26 KB
/
LTask.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats.effect
import cats.Eq
import cats.effect.internals.{Callback, Conversions}
import cats.effect.laws.util.TestContext
import org.scalacheck.{Arbitrary, Cogen}
import scala.concurrent.{ExecutionContext, ExecutionException, Future, Promise}
/**
* Built for testing noncancelable effect data types.
*/
case class LTask[A](run: ExecutionContext => Future[A])
object LTask {
/** For testing laws with ScalaCheck. */
implicit def arbitrary[A](implicit A: Arbitrary[IO[A]]): Arbitrary[LTask[A]] =
Arbitrary(A.arbitrary.map { io => LTask(_ => io.unsafeToFuture()) })
/** For testing laws with ScalaCheck. */
implicit def cogenForLTask[A]: Cogen[LTask[A]] =
Cogen[Unit].contramap(_ => ())
/** For testing laws with ScalaCheck. */
implicit def eqForLTask[A](implicit A: Eq[Future[A]], ec: TestContext): Eq[LTask[A]] =
new Eq[LTask[A]] {
def eqv(x: LTask[A], y: LTask[A]): Boolean = {
val lh = x.run(ec)
val rh = y.run(ec)
ec.tick()
A.eqv(lh, rh)
}
}
/** Instances for `LTask`. */
implicit def effectInstance(implicit ec: ExecutionContext): Effect[LTask] =
new Effect[LTask] {
def pure[A](x: A): LTask[A] =
LTask(_ => Future.successful(x))
def raiseError[A](e: Throwable): LTask[A] =
LTask(_ => Future.failed(e))
def suspend[A](thunk: => LTask[A]): LTask[A] =
LTask { implicit ec => Future.successful(()).flatMap(_ => thunk.run(ec)) }
def async[A](k: (Either[Throwable, A] => Unit) => Unit): LTask[A] =
LTask { implicit ec =>
val p = Promise[A]()
k(r => p.tryComplete(Conversions.toTry(r)))
p.future
}
def runAsync[A](fa: LTask[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
IO(fa.run(ec).onComplete { r =>
cb(Conversions.toEither(r)).unsafeRunAsync(Callback.report)
})
def flatMap[A, B](fa: LTask[A])(f: A => LTask[B]): LTask[B] =
LTask { implicit ec =>
Future.successful(()).flatMap { _ =>
fa.run(ec).flatMap { a => f(a).run(ec) }
}
}
def tailRecM[A, B](a: A)(f: A => LTask[Either[A, B]]): LTask[B] =
flatMap(f(a)) {
case Left(a) => tailRecM(a)(f)
case Right(b) => pure(b)
}
def handleErrorWith[A](fa: LTask[A])(f: Throwable => LTask[A]): LTask[A] =
LTask { implicit ec =>
Future.successful(()).flatMap { _ =>
fa.run(ec).recoverWith {
case err: ExecutionException if err.getCause ne null =>
f(err.getCause).run(ec)
case err =>
f(err).run(ec)
}
}
}
}
}