/
ClassLoaderActivitySource.scala
47 lines (41 loc) · 1.52 KB
/
ClassLoaderActivitySource.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.twitter.io
import com.twitter.util._
import java.util.concurrent.atomic.AtomicBoolean
/**
* An ActivitySource for ClassLoader resources.
*/
class ClassLoaderActivitySource private[io] (classLoader: ClassLoader, pool: FuturePool)
extends ActivitySource[Buf] {
private[io] def this(classLoader: ClassLoader) = this(classLoader, FuturePool.unboundedPool)
def get(name: String): Activity[Buf] = {
// This Var is updated at most once since ClassLoader
// resources don't change (do they?).
val runOnce = new AtomicBoolean(false)
val p = new Promise[Activity.State[Buf]]
// Defer loading until the first observation
val v = Var.async[Activity.State[Buf]](Activity.Pending) { value =>
if (runOnce.compareAndSet(false, true)) {
pool {
classLoader.getResourceAsStream(name) match {
case null => p.setValue(Activity.Failed(ActivitySource.NotFound))
case stream =>
val reader =
new InputStreamReader(stream, InputStreamReader.DefaultMaxBufferSize, pool)
BufReader.readAll(reader) respond {
case Return(buf) =>
p.setValue(Activity.Ok(buf))
case Throw(cause) =>
p.setValue(Activity.Failed(cause))
} ensure {
// InputStreamReader ignores the deadline in close
reader.close(Time.Undefined)
}
}
}
}
p.onSuccess(value() = _)
Closable.nop
}
Activity(v)
}
}