-
Notifications
You must be signed in to change notification settings - Fork 10
/
KeyedAlgebraCombinators.scala
131 lines (112 loc) · 4.81 KB
/
KeyedAlgebraCombinators.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package zio.entity.core
import izumi.reflect.Tag
import zio.entity.core.Combinators.ImpossibleTransitionException
import zio.entity.data.Versioned
import zio.{Chunk, IO, NonEmptyChunk, Ref, Task, UIO, ZIO}
case class KeyedAlgebraCombinators[Key: Tag, State: Tag, Event: Tag, Reject](
key: Key,
state: Ref[Option[State]],
userBehaviour: Fold[State, Event],
errorHandler: Throwable => Reject,
algebraCombinatorConfig: AlgebraCombinatorConfig[Key, State, Event]
) extends Combinators[State, Event, Reject] {
import algebraCombinatorConfig._
type Offset = Long
override def read: IO[Reject, State] = {
val result = state.get.flatMap {
case Some(state) =>
IO.succeed(state)
case None =>
// read from database (I need the key) and run the events until that state to now
for {
stateReturned <- recover
_ <- state.set(Some(stateReturned))
} yield stateReturned
}
result
}
override def append(es: Event, other: Event*): IO[Reject, Unit] = {
//append event and store offset
// read the offset by key
for {
offset <- getOffset
events: NonEmptyChunk[Event] = NonEmptyChunk(es, other: _*)
currentState <- read
newState <- userBehaviour.init(currentState).run(events).mapError(errorHandler)
_ <- state.set(Some(newState))
_ <- eventJournal.append(key, offset, events).mapError(errorHandler).provide(tagging)
_ <- snapshotting.snapshot(key, Versioned(offset, currentState), Versioned(offset + events.size, newState)).mapError(errorHandler)
_ <- eventJournalOffsetStore.setValue(key, offset + events.size).mapError(errorHandler)
} yield ()
}
override def reject[A](r: Reject): IO[Reject, A] = IO.fail(r)
private def getOffset: IO[Reject, Offset] = eventJournalOffsetStore.getValue(key).bimap(errorHandler, _.getOrElse(0L))
private def recover: IO[Reject, State] = {
snapshotting.load(key).mapError(errorHandler).flatMap { versionedStateMaybe =>
// if nothing there, get initial state
// I need current offset from offset store
val (offset, readStateFromSnapshot) =
versionedStateMaybe.fold(0L -> userBehaviour.initial)(versionedState => versionedState.version -> versionedState.value)
// read until the current offset
getOffset.flatMap {
case offsetValue if offsetValue > 0 =>
// read until offsetValue
val foldBehaviour = userBehaviour.init(readStateFromSnapshot)
eventJournal
.read(key, offset)
.foldWhileM(readStateFromSnapshot -> offset) { case (_, foldedOffset) => foldedOffset == offsetValue } { case ((state, _), entityEvent) =>
foldBehaviour
.reduce(state, entityEvent.payload)
.bimap(
{
case Fold.ImpossibleException => ImpossibleTransitionException(state, entityEvent)
case other => other
},
{ processedState =>
processedState -> entityEvent.sequenceNr
}
)
}
.bimap(errorHandler, _._1)
case _ => IO.succeed(readStateFromSnapshot)
}
}
}
}
object KeyedAlgebraCombinators {
def fromParams[Key: Tag, State: Tag, Event: Tag, Reject](
key: Key,
userBehaviour: Fold[State, Event],
errorHandler: Throwable => Reject,
algebraCombinatorConfig: AlgebraCombinatorConfig[Key, State, Event]
): UIO[KeyedAlgebraCombinators[Key, State, Event, Reject]] =
Ref
.make[Option[State]](None)
.map { state =>
new KeyedAlgebraCombinators[Key, State, Event, Reject](
key,
state,
userBehaviour,
errorHandler,
algebraCombinatorConfig
)
}
}
// TODO: can the output be a IO instead?
final case class Fold[State, Event](initial: State, reduce: (State, Event) => Task[State]) {
def init(a: State): Fold[State, Event] = copy(initial = a)
def contramap[Y](f: Y => Event): Fold[State, Y] = Fold(initial, (a, c) => reduce(a, f(c)))
def run(gb: Chunk[Event]): Task[State] =
gb.foldM(initial)(reduce)
def focus[B](get: State => B)(set: (State, B) => State): Fold[B, Event] =
Fold(get(initial), (s, e) => reduce(set(initial, s), e).map(get))
def expand[B](init: State => B, read: B => State, update: (B, State) => B): Fold[B, Event] =
Fold(init(initial), (current, e) => reduce(read(current), e).map(update(current, _)))
}
object Fold {
object ImpossibleException extends RuntimeException
// TODO: add proper log
val impossible: ZIO[Any, Throwable, Nothing] = Task.effectTotal(println("Impossible state exception")) *> Task.fail(ImpossibleException)
def count[A]: Fold[Long, A] =
Fold(0L, (c, _) => Task.succeed(c + 1L))
}