Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actor Persistence #112

Open
softinio opened this issue Dec 18, 2019 · 7 comments
Open

Actor Persistence #112

softinio opened this issue Dec 18, 2019 · 7 comments
Assignees

Comments

@softinio
Copy link
Contributor

softinio commented Dec 18, 2019

Requirements

  • Add ability to persist actor state to enable recovery
  • Events by actor stored in a supported data store
  • Events are stored by appending and never mutated
  • Add recovery and reply ability by utilizing the persisted events
  • Support ability to recover from full history or a given checkpoint
  • Ability to set a checkpoint

Data Stores to support as a minimum

It would be ideal to have ZIO implementations as an optional dependency on maven for these datastores

  • Postgresql (ZIO Wrapper around Doobie or Skunk)
  • Kafka (Using zio-kafka)
  • Cassandra / ScyllaDB
  • Redis

Notes

  • This feature needs to be split up into multiple issues
@softinio softinio added this to To do in Zio Actors Project 1 via automation Dec 18, 2019
@softinio softinio self-assigned this Dec 18, 2019
@mtsokol
Copy link
Contributor

mtsokol commented Dec 27, 2019

@softinio Hi, I've started thinking about an API design for this ticket. Have you started working on this?

After some time of thinking about it on my own and then looking at Akka, I came to a conclusion that Akka's API design is suitable.

Do you think that is the correct approach? Or is there a better one?

Design

So, as we already have Stateful trait for describing actor's non-persisted behavior, first we need to introduce equivalent of EventSourcedBehavior:

trait EventSourcedStateful[S, +E <: Throwable, -F[_], Ev]

  def receive[A](state: S, msg: F[A], context: Context) :IO[E, (Event[Ev], A)]

  def sourceEvent(state: S, event: Ev): S

}

Same as in Akka, the API should provide two separate steps of processing a message:

  1. receive - An effectful processing with possible side effects e.g. spawning new actors. The state in the return type tuple is replaced with an Event which indicates whether to persist it or ignore (then we avoid persisting data retrieval messages and persist only those which change actor's state).
    Also this stage can fail. (similar to commandHandler in Akka)

  2. sourceEvent - Pure function that only updates actor's state, performs no side effects and never fails. This stage is persisted into datastore, where table would be like Akka journal table. (similar to eventHandler in Akka)

For representing events there's Event (the one used in return type in receive):

sealed trait Event[+Ev]
case class Persist[Ev](event: Ev) extends Event[Ev]
case object Ignore extends Event[Nothing]
object Event {
  def persist[Ev](event: Ev) = Persist(event)

  def ignore = Ignore
}

Such design is pretty much the same as in Akka, because we need a separation for impure message processing and pure state update that we persist.

Reading persisted events for given persistenceId is performed when an actor with such persistenceId is spawned. So the flow is: receive with side effects / failures => sourceEvent => persistence.

Example

sealed trait Msg[+A]
case object Increase extends Msg[Unit]

sealed trait MyEvent
case object SomeEvent extends MyEvent

new EventSourcedStateful[Int, Nothing, Msg, MyEvent] {

  override def receive[A](state: Int,
                          msg: Msg[A],
                          context: Context)
  : IO[Nothing, (Event[MyEvent], A)] =
    msg match {
      case Increase =>
        state match {
          case i if i > 10 =>
            IO.effectTotal((Event.ignore, ()))
          case _ =>
            IO.effectTotal((Event.persist(SomeEvent), ()))
        }
    }

  override def sourceEvent(state: Int, event: MyEvent): Int = 
    event match {
      case SomeEvent =>
        state + 1
  }

}

Thanks to using ZIO we can be sure that user won't make any side effects in sourceEvent inadvertently by returning just S. In Akka it's easy to do some side effects there so they provide warnings about it in docs: https://doc.akka.io/docs/akka/current/typed/persistence.html#event-handler.
(I think that's a great advantage to force it on API level!)

Also when spawning an actor with a EventSourcedStateful there will be also persistenceId to provide primary key for table storage.


What do you think about such design?

@softinio
Copy link
Contributor Author

@mtsokol Nice work Sir! Overall I am fine with your design but would be good to do more to distinguish our implementation from Akka.

I haven't started work on this myself yet, I assigned myself to it as I wanted to break it up into smaller issues as I feel this is a big topic to have as a single github issue.

Lets use this issue to finalize design and approach and create new related issues for doing the work so we can split the work.

We need to design a common Trait that gets extended for each datastore that we will support.

Will provide more feedback later this weekend once I have chance to think your design through more.

@mijicd Look forward to your feedback also on this approach?

@mijicd
Copy link
Member

mijicd commented Jan 3, 2020

I agree with both about taking the approach from Akka as the API design foundation. It is battle-tested, and anyone interested in this feature is more or less familiar with it. That being said, what's called Event in the proposal, is Command in Akka's terminology, and in my opinion, it should be aligned with it.

I don't agree with the statement about sourceEvent and ZIO. Based on my understanding of the proposal, ZIO won't provide freedom from side effects there.

@softinio you can "merge" ScyllaDB and Cassandra, the same client can be used for both.

@mtsokol
Copy link
Contributor

mtsokol commented Jan 15, 2020

@mijicd Thanks for your remarks, I've started working on this.

My clarification regarding the statement about sourceEvent: as the signature implies the function should be pure and commit no side effects - surely a ZIO effect can be run with unsafeRun or any side effect. So as almost whole API is wrapped in ZIO (and performs side effects), this signature indicates that this function should be pure (so it's at informative level), contrary to Akka where by signature it's not obvious that their sourceEvent needs to be pure (which is exclaimed in docs).
Did I get it right? Or am I missing something?

@mijicd
Copy link
Member

mijicd commented Jan 15, 2020

Well, that's what's not true :). Akka's signature is exactly the same as the one presented above: (S, E) => S. In other words, the guarantees we provide here are exactly the same.

@mtsokol
Copy link
Contributor

mtsokol commented Jan 15, 2020

Sure, I think I overinterpreted it then.

. akka zio-actors
commandHandler (S, C) => E (S, M) => IO[E]
eventHandler (S, E) => S (S, E) => S

So what I meant was that the lack of IO in eventHandler indicates that it should make no side effects in ZIO case, where in akka there's only plain Effect dsl returned for commandHandler.

But as I said I overinterpreted it 😛

@mijicd
Copy link
Member

mijicd commented Jan 15, 2020

commandHandler is clear and fine, with ZIO taking place of Effect. eventHandler is the same in both cases, relying on sanity in developing them :).

Anyhow, let's draft it out, and see how it evolves. It might show us further polishing opportunities.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

No branches or pull requests

3 participants