-
Notifications
You must be signed in to change notification settings - Fork 7
/
MappedReadChannel.scala
94 lines (62 loc) · 2.32 KB
/
MappedReadChannel.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package gopher.impl
import gopher._
import scala.util._
import scala.util.control.NonFatal
class MappedReadChannel[F[_],A, B](internal: ReadChannel[F,A], f: A=> B) extends ReadChannel[F,B] {
class MReader(nested: Reader[B]) extends Reader[A] {
def wrappedFun(fun: (Try[B] => Unit) ): (Try[A] => Unit) = {
case Success(a) =>
try
val b = f(a)
fun(Success(b))
catch
case NonFatal(ex) =>
fun(Failure(ex))
case Failure(ex) =>
fun(Failure(ex))
}
//TODO: think, are we want to pass error to the next level ?
override def capture(): Expirable.Capture[Try[A]=>Unit] =
nested.capture().map{ fun =>
wrappedFun(fun)
}
override def canExpire: Boolean = nested.canExpire
override def isExpired: Boolean = nested.isExpired
override def markUsed(): Unit = nested.markUsed()
override def markFree(): Unit = nested.markFree()
}
def addReader(reader: Reader[B]): Unit =
internal.addReader(MReader(reader))
def addDoneReader(reader: Reader[Unit]): Unit = internal.addDoneReader(reader)
def gopherApi:Gopher[F] = internal.gopherApi
}
class MappedAsyncReadChannel[F[_],A, B](internal: ReadChannel[F,A], f: A=> F[B]) extends ReadChannel[F,B] {
def addDoneReader(reader: Reader[Unit]): Unit = internal.addDoneReader(reader)
class MReader(nested: Reader[B]) extends Reader[A] {
def wrappedFun(fun: (Try[B] => Unit) ): (Try[A] => Unit) = {
case Success(a) =>
gopherApi.spawnAndLogFail(
try
asyncMonad.mapTry(f(a))(fun)
catch
case NonFatal(ex) =>
fun(Failure(ex))
asyncMonad.pure(())
)
case Failure(ex) =>
fun(Failure(ex))
}
//TODO: think, are we want to pass error to the next level ?
override def capture(): Expirable.Capture[Try[A]=>Unit] =
nested.capture().map{ fun =>
wrappedFun(fun)
}
override def canExpire: Boolean = nested.canExpire
override def isExpired: Boolean = nested.isExpired
override def markUsed(): Unit = nested.markUsed()
override def markFree(): Unit = nested.markFree()
}
def addReader(reader: Reader[B]): Unit =
internal.addReader(MReader(reader))
def gopherApi:Gopher[F] = internal.gopherApi
}