/
Backend.scala
95 lines (76 loc) · 2.37 KB
/
Backend.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package sample
import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.actor.ActorRef
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.Routing
import akka.actor.PoisonPill
import akka.config.Supervision
import akka.actor.ReceiveTimeout
object Backend {
case class TranslationRequest(text: String)
case class TranslationResponse(text: String, words: Int)
val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("backend-dispatcher")
.setCorePoolSize(7)
.build
val translationService = loadBalanced(10, actorOf[TranslationService])
private def loadBalanced(poolSize: Int, actor: ⇒ ActorRef): ActorRef = {
val workers = Vector.fill(poolSize)(actor.start())
Routing.loadBalancerActor(CyclicIterator(workers)).start()
}
class TranslationService extends Actor {
self.dispatcher = backendDispatcher
val translator = loadBalanced(4, actorOf[Translator])
val counter = loadBalanced(4, actorOf[Counter])
def receive = {
case TranslationRequest(text) ⇒
for (replyTo ← self.sender) {
val aggregator = actorOf(new Aggregator(replyTo)).start()
translator.tell(text, aggregator)
counter.tell(text, aggregator)
}
}
}
class Aggregator(replyTo: ActorRef) extends Actor {
self.dispatcher = backendDispatcher
self.lifeCycle = Supervision.Temporary
self.receiveTimeout = Some(1000)
var textResult: Option[String] = None
var lengthResult: Option[Int] = None
def receive = {
case text: String ⇒
textResult = Some(text)
replyWhenDone()
case length: Int ⇒
lengthResult = Some(length)
replyWhenDone()
case ReceiveTimeout ⇒
self.stop()
}
def replyWhenDone() {
for (text ← textResult; length ← lengthResult) {
replyTo ! TranslationResponse(text, length)
self.stop()
}
}
}
class Translator extends Actor {
self.dispatcher = backendDispatcher
def receive = {
case x: String ⇒
Thread.sleep(100)
val result = x.toUpperCase
self.channel ! result
}
}
class Counter extends Actor {
self.dispatcher = backendDispatcher
def receive = {
case x: String ⇒
Thread.sleep(100)
val result = x.split(" ").length
self.channel ! result
}
}
}