/
Helpers.scala
executable file
·103 lines (85 loc) · 2.54 KB
/
Helpers.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Copyright (C) 2007-2008 Scala OTP Team
*/
package scala.actors.behavior
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.actors._
import scala.actors.Actor._
import net.lag.logging.{Logger, Config}
class SystemFailure(cause: Throwable) extends RuntimeException(cause)
/**
* Base trait for all classes that wants to be able use the logging infrastructure.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
trait Logging {
Config.config // To make sure config system is bootstrapped
@transient val log = Logger.get(this.getClass.getName)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
object Helpers extends Logging {
// ================================================
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock
private val writeLock = rwl.writeLock
def withWriteLock[T](body: => T): T = {
writeLock.lock
try {
body
} finally {
writeLock.unlock
}
}
def withReadLock[T](body: => T): T = {
readLock.lock
try {
body
} finally {
readLock.unlock
}
}
}
// ================================================
// implicit conversion between regular actor and actor with a type future
implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) {
def receiveWithin(timeout: Int) : Option[T]
}
def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
future.receiveWithin(timeout) match {
case None => errorHandler
case Some(reply) => reply
}
}
class ActorWithTypedFuture(a: Actor) {
require(a != null)
def !!![A](msg: Any): FutureWithTimeout[A] = {
val ftch = new Channel[Any](Actor.self)
a.send(msg, ftch)
new FutureWithTimeout[A](ftch) {
def apply() =
if (isSet) value.get
else ch.receive {
case a: A =>
value = Some(a)
value.get
}
def isSet = receiveWithin(0).isDefined
def receiveWithin(timeout: Int): Option[A] = value match {
case None => ch.receiveWithin(timeout) {
case TIMEOUT =>
log.debug("Future timed out while waiting for actor: {}", a)
None
case a: A =>
value = Some(a)
value
}
case a: Some[A] => a
}
}
}
}
}