/
AppendReadChannel.scala
72 lines (51 loc) · 1.76 KB
/
AppendReadChannel.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package gopher.impl
import gopher._
import scala.util._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
/**
* Input, which reed from the first channel, and after first channel is closed - from second
*
* can be created with 'append' operator.
*
* {{{
* val x = read(x|y)
* }}}
*/
case class AppendReadChannel[F[_],A](x: ReadChannel[F,A], y: ReadChannel[F,A]) extends ReadChannel[F,A]:
override def gopherApi: Gopher[F] = x.gopherApi
val xClosed: AtomicBoolean = new AtomicBoolean(false)
class InterceptReader(nested: Reader[A]) extends Reader[A] {
val inUsage = AtomicBoolean(false)
def canExpire: Boolean = nested.canExpire
def isExpired: Boolean = nested.isExpired
def capture():Expirable.Capture[Try[A]=>Unit] =
nested.capture().map{ readFun =>
{
case r@Success(a) => if (inUsage.get()) then
nested.markUsed()
readFun(r)
case r@Failure(ex) =>
if (ex.isInstanceOf[ChannelClosedException]) then
xClosed.set(true)
nested.markFree()
y.addReader(nested)
else
if (inUsage.get()) then
nested.markUsed()
readFun(r)
}
}
def markUsed(): Unit =
inUsage.set(true)
def markFree(): Unit =
nested.markFree()
}
def addReader(reader: Reader[A]): Unit =
if (xClosed.get()) {
y.addReader(reader)
} else {
x.addReader(new InterceptReader(reader))
}
def addDoneReader(reader: Reader[Unit]): Unit =
y.addDoneReader(reader)