/
DistributeRevereStringActor.scala
92 lines (70 loc) · 2.64 KB
/
DistributeRevereStringActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package examples
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash}
import akka.routing.{RoundRobinGroup, RoundRobinPool}
import examples.DistributeRevereStringWithRoundRobinActor.Slaves
import net.rollercoders.akka.awp.`trait`.AWP
class DistributeRevereStringWithRoundRobinActor(slaves: Slaves) extends Actor with ActorLogging with Stash with AWP {
import DistributeRevereStringWithRoundRobinActor._
import SlaveActor.Reverse
override def preStart(): Unit = {
slaves match {
case Pool(pool) =>
val router = context.actorOf(RoundRobinPool(pool).props(Props[SlaveActor]()))
awpSelf ! Init(router, pool)
case Group(paths) =>
val router = context.actorOf(RoundRobinGroup(paths).props())
awpSelf ! Init(router, paths.length)
}
}
override def receive: Receive = waiting()
def waiting(): Receive = {
case Init(router, slaveCount) =>
log.info(s"[Init] poolSize: $slaveCount")
context.become(ready(router))
unstashAll()
case _ =>
stash()
}
def ready(router: ActorRef, left: Int = 0, results: Vector[(String, Int)] = Vector.empty): Receive = {
case Exec(str, part) =>
log.info(s"[EXEC] Received long string to split in $part part")
val len = str.length / part
(0 until part)
.foreach(index => {
val start = len * index
val end = if (index == part - 1) str.length else len * (index + 1)
val subStr = str.substring(start, end)
router ! Reverse(subStr, index)
})
context.become(ready(router, part, results))
case Sorted(sorted, id) if left == 1 =>
val result = (results :+ (sorted, id)).sortBy(_._2).reverse.map(_._1).mkString("")
awpSelf ! Result(result)
case Sorted(sorted, id) =>
context.become(ready(router, left - 1, results :+ (sorted, id)))
case Result(result) =>
log.info(s"[RESULT] $result")
context.become(ready(router))
}
override implicit val awpSelf: ActorRef = self
}
object DistributeRevereStringWithRoundRobinActor {
sealed trait Slaves
case class Pool(pool: Int) extends Slaves
case class Group(paths: List[String]) extends Slaves
case class Init(router: ActorRef, slaveCount: Int)
case class Exec(str: String, part: Int)
case class Sorted(sorted: String, id: Int)
case class Start(children: Int)
case class Result(result: String)
}
class SlaveActor() extends Actor {
import DistributeRevereStringWithRoundRobinActor.Sorted
import SlaveActor._
override def receive: Receive = {
case Reverse(str, id) => sender() ! Sorted(str.reverse, id)
}
}
object SlaveActor {
case class Reverse(str: String, id: Int)
}