/
Saga.scala
103 lines (82 loc) · 2.91 KB
/
Saga.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package pl.newicom.dddd.process
import akka.persistence.RecoveryCompleted
import pl.newicom.dddd.aggregate._
import pl.newicom.dddd.delivery.protocol.alod._
import pl.newicom.dddd.messaging.MetaAttribute.{Publisher_Type, Reused}
import pl.newicom.dddd.messaging.PublisherTypeValue.BP
import pl.newicom.dddd.messaging.event.EventMessage
import pl.newicom.dddd.messaging.{MetaData, MetaDataPropagationPolicy}
case object EventDroppedMarkerEvent extends DomainEvent
sealed trait SagaAction
case class RaiseEvent(e: DomainEvent) extends SagaAction
case object DropEvent extends SagaAction
trait SagaAbstractStateHandling {
type ReceiveEvent = PartialFunction[DomainEvent, SagaAction]
def receiveEvent: ReceiveEvent
def updateState(event: DomainEvent): Unit
def initialized: Boolean
}
trait Saga extends SagaBase {
this: SagaAbstractStateHandling =>
override def receiveRecover: Receive = {
case _: RecoveryCompleted =>
// do nothing
case msg: Any =>
_updateState(msg)
}
override def receiveCommand: Receive = {
case em: EventMessage =>
val event = em.event
val actionMaybe: Option[SagaAction] =
em.mustFollow.fold(Option(receiveEvent(event))) { mustFollow =>
if (wasReceived(mustFollow))
Option(receiveEvent(event))
else
None
}
if (actionMaybe.isEmpty) {
log.debug("Message out of order detected: {}", em.id)
} else {
val action = actionMaybe.get
val eventToPersist = action match {
case RaiseEvent(raisedEvent) => raisedEvent
case DropEvent => EventDroppedMarkerEvent
}
val emToPersist = EventMessage(eventToPersist).withMetaData(
MetaDataPropagationPolicy.onEventAcceptedByPM(
receivedEvent = em.metadata,
eventToStore = MetaData(Publisher_Type -> BP, Reused -> (eventToPersist == event))
))
persist(emToPersist) { persisted =>
log.debug("Event message persisted: {}", persisted)
_updateState(persisted)
acknowledgeEvent(persisted)
}
onEventReceived(em, action)
}
case receipt: Delivered if initialized =>
persist(EventMessage(receipt).withPublisherType(BP))(_updateState)
}
private def _updateState(msg: Any): Unit = {
msg match {
case EventMessage(_, receipt: Delivered) =>
confirmDelivery(receipt.deliveryId)
updateState(receipt)
case em: EventMessage => em.event match {
case EventDroppedMarkerEvent =>
messageProcessed(em)
case event =>
messageProcessed(em)
updateState(event)
}
}
}
def onEventReceived(em: EventMessage, appliedAction: SagaAction): Unit = {
appliedAction match {
case DropEvent =>
log.debug(s"Event dropped: ${em.event}")
case RaiseEvent(e) =>
log.debug(s"Event raised: $e")
}
}
}