/
PimpedFuture.scala
134 lines (104 loc) · 3.9 KB
/
PimpedFuture.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package lila
import play.api.libs.concurrent.Execution.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{ Future, ExecutionContext }
object PimpedFuture {
private type Fu[A] = Future[A]
object DirectExecutionContext extends ExecutionContext {
override def execute(command: Runnable): Unit = command.run()
override def reportFailure(cause: Throwable): Unit =
throw new IllegalStateException("lila DirectExecutionContext failure", cause)
}
final class LilaPimpedFuture[A](val fua: Fu[A]) extends AnyVal {
def dmap[B](f: A => B): Fu[B] = fua.map(f)(DirectExecutionContext)
def dforeach[B](f: A => Unit): Unit = fua.foreach(f)(DirectExecutionContext)
def >>-(sideEffect: => Unit): Fu[A] = fua andThen {
case _ => sideEffect
}
def >>[B](fub: => Fu[B]): Fu[B] = fua flatMap (_ => fub)
def void: Fu[Unit] = fua.map(_ => ())(DirectExecutionContext)
def inject[B](b: => B): Fu[B] = fua.map(_ => b)(DirectExecutionContext)
def injectAnyway[B](b: => B): Fu[B] = fold(_ => b, _ => b)
def effectFold(fail: Exception => Unit, succ: A => Unit) {
fua onComplete {
case scala.util.Failure(e: Exception) => fail(e)
case scala.util.Failure(e) => throw e // Throwables
case scala.util.Success(e) => succ(e)
}
}
def fold[B](fail: Exception => B, succ: A => B): Fu[B] =
fua map succ recover { case e: Exception => fail(e) }
def flatFold[B](fail: Exception => Fu[B], succ: A => Fu[B]): Fu[B] =
fua flatMap succ recoverWith { case e: Exception => fail(e) }
def logFailure(logger: => lila.log.Logger, msg: Exception => String): Fu[A] =
addFailureEffect { e => logger.warn(msg(e), e) }
def logFailure(logger: => lila.log.Logger): Fu[A] = logFailure(logger, _.toString)
def addFailureEffect(effect: Exception => Unit) = {
fua onFailure {
case e: Exception => effect(e)
}
fua
}
def addEffect(effect: A => Unit): Fu[A] = {
fua foreach effect
fua
}
def addEffects(fail: Exception => Unit, succ: A => Unit): Fu[A] = {
fua onComplete {
case scala.util.Failure(e: Exception) => fail(e)
case scala.util.Failure(e) => throw e // Throwables
case scala.util.Success(e) => succ(e)
}
fua
}
def addEffectAnyway(inAnyCase: => Unit): Fu[A] = {
fua onComplete {
case _ => inAnyCase
}
fua
}
def mapFailure(f: Exception => Exception) = fua recover {
case cause: Exception => throw f(cause)
}
def prefixFailure(p: => String) = mapFailure { e =>
base.LilaException(s"$p ${e.getMessage}")
}
def thenPp: Fu[A] = {
effectFold(
e => println("[failure] " + e),
a => println("[success] " + a)
)
fua
}
def thenPp(msg: String): Fu[A] = {
effectFold(
e => println(s"[$msg] [failure] $e"),
a => println(s"[$msg] [success] $a")
)
fua
}
def await(duration: FiniteDuration): A =
scala.concurrent.Await.result(fua, duration)
def awaitOrElse(duration: FiniteDuration, default: => A): A = try {
scala.concurrent.Await.result(fua, duration)
} catch {
case _: Exception => default
}
def awaitSeconds(seconds: Int): A =
await(seconds.seconds)
def withTimeout(duration: FiniteDuration, error: => Throwable)(implicit system: akka.actor.ActorSystem): Fu[A] = {
Future firstCompletedOf Seq(
fua,
akka.pattern.after(duration, system.scheduler)(Future failed error)
)
}
def withTimeoutDefault(duration: FiniteDuration, default: => A)(implicit system: akka.actor.ActorSystem): Fu[A] = {
Future firstCompletedOf Seq(
fua,
akka.pattern.after(duration, system.scheduler)(Future(default))
)
}
def chronometer = lila.common.Chronometer(fua)
def mon(path: lila.mon.RecPath) = chronometer.mon(path).result
}
}