-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathMain.scala
More file actions
69 lines (59 loc) · 1.96 KB
/
Main.scala
File metadata and controls
69 lines (59 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package example
import akka.actor.{ActorSystem, Props}
import akka.persistence.PersistentActor
import akka.persistence.journal.Tagged
import com.typesafe.config.ConfigFactory
case class Command(i: Int)
case class Event(i: Int)
class MyPersistentActor extends PersistentActor {
var sum: Int = 0
override val persistenceId: String = "myPid"
override def receiveRecover: Receive = {
case Event(i) ⇒
println(s"receiveRecover : Recovering an event = Event($i)")
sum += i
println(s"receiveRecover : current state = $sum")
case Tagged(Event(i), tags) ⇒
println(s"receiveRecover : Recovering from Tagged(Event($i), $tags)")
sum += i
println(s"receiveRecover : current state = $sum")
}
override def receiveCommand: Receive = {
case Command(i) ⇒
println(s"receiveCommand : Received Command($i)")
persist(Event(i)) { event ⇒
println(s"persist callback: Event = Event(${event.i}) persisted")
sum += i
println(s"persist callback: current state = $sum")
}
case "kaboom" ⇒
throw new Exception("exploded!")
}
}
object Main {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
val system = ActorSystem("exampleSystem", config)
try {
val props = Props(new MyPersistentActor)
val p1 = system.actorOf(props, "p1")
p1 ! Command(1)
p1 ! Command(2)
p1 ! Command(3)
// akka-persistence-cassandra plugin has internal buffering,
// so if a persistent actor throws an exception and restart
// before the buffer is flushed to write everything into cassandra,
// there could be sequence number inconsistency ...
//
// There seems to be a way to avoid that issue by configuration,
// but Thread.sleep was easier for this example.
Thread.sleep(3000)
p1 ! "kaboom"
p1 ! Command(4)
p1 ! Command(5)
Thread.sleep(3000)
} finally {
system.terminate()
}
}
}