Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Query for Cassandra #77

Open
6 tasks
zapletal-martin opened this issue Aug 24, 2015 · 58 comments
Open
6 tasks

Persistence Query for Cassandra #77

zapletal-martin opened this issue Aug 24, 2015 · 58 comments
Labels

Comments

@zapletal-martin
Copy link
Collaborator

zapletal-martin commented Aug 24, 2015

Akka persistence query complements Persistence by providing a universal asynchronous stream based query interface that various journal plugins can implement in order to expose their query capabilities. The API is documented http://doc.akka.io/docs/akka/snapshot/scala/persistence-query.html#persistence-query-scala. An example implementation using LevelDB is described here http://doc.akka.io/docs/akka/snapshot/scala/persistence-query-leveldb.html#persistence-query-leveldb-scala.

Akka-persistence-cassandra should support this new query side API. The API is available in Akka 2.4 so this work will need to be done against akka-persistence-cassandra supporting 2.4. It will also require introduction of dependency on Akka streams.

The work is tracked in the following tickets

@zapletal-martin
Copy link
Collaborator Author

I have started working on this issue. Will update with progress and create a PR when ready.

@chbatey
Copy link
Collaborator

chbatey commented Aug 24, 2015

awesome, i had planned to start on it but go for it. This PR brings the dependency up to 2.4-RC1 which should get merged soon so worth basing your work off this #69

@ktoso
Copy link

ktoso commented Aug 24, 2015

Very cool, thanks for hopping on to it @zapletal-martin :-)
Please be aware that the new query API is experimental and we're still looking for the best way to expose it - feedback is very welcome (!), here's our general discussion ticket: akka/akka#18229

@juanjovazquez
Copy link

+1

Thanks @zapletal-martin for bringing this here. This is a real MUST for us. We're thrilled to improve the READ layer in our CQRS stuff. We'll have a look as soon as you share your work. Of course @krasserm should coordinate this effort IMHO :)

@chbatey
Copy link
Collaborator

chbatey commented Sep 2, 2015

I'd love to hear more use cases, my initial thoughts:

  • Anyone who has chosen c* as the backing store would typically have too many events to make AllPersistenceIDs feasible.
  • EventsByPersistentId is the easy one. We can use the back pressure to handle the paging of the query (http://datastax.github.io/java-driver/features/paging/)
  • EventsByPersistentIds is feasible for small numbers of IDs
  • Events by tag will be more feasible once c* 3 is out and we have materialised views.

There's also a possibility we don't need to go via the query interface tho this is a lot of work. Spark streaming has similar requirements, checkout https://issues.apache.org/jira/browse/CASSANDRA-8844 but that is a long way off :)

I definitely think we should discuss this in depth before akka 2.4 / plugin 0.4 is released in case we wan to make any data model changes.

@zapletal-martin
Copy link
Collaborator Author

Thanks @chbatey .

As I see it there are 3 options:

  1. push to stream from Cassandra (currently not possible out of the box. At least not until the CDC is implemented or something like https://github.com/adkhare/CassandraKafkaIndex is feasible or we implement something)
  2. push to stream from write journal (as mentioned in the PR. I still did not have a chance to investigate if it would work correctly (e.g. if all information would be available, ordering for Tagged events etc.) and what performance impact it might have)
  3. purely pull from Cassandra

If we wanted to use 3) the ideal case would be to have defined ordering (persistenceIds, events by persistenceId and events by tag) and be able to query using offset.

AllPersistenceIDs - it is relatively simple to achieve at the moment without any data model changes, but as you mention it may not be feasible considering a large amount of events and inability to easily detect a change without a notion of order (set difference). More specialised data structure allowing the above mentioned properties would work much better.

EventsByPersistenceId - paging could work well, however it still does not support offset queries. We may need to run a query in fromSequenceNr toSequenceNr range. That requirement is directly in the API and would be required considering pull only approach.

@chbatey
Copy link
Collaborator

chbatey commented Sep 2, 2015

i'd maybe incorrectly assumed that this had to work in a different JVM to the writer, WDYT?

@zapletal-martin
Copy link
Collaborator Author

The read and write journal running in the same actor system was probably my incorrect assumption based on reading the LevelDb implementation. I assume you should be able to execute the query from anywhere and therefore that option is not relevant unless we want to communicate remotely.

@juanjovazquez
Copy link

Just in case this can help. In our application, we have lots of shards of actors where each shard represents an aggregate in terms of DDD. So, every instance of each aggregate type has its own persistenceId and lifecycle of course. So, from my point of view, all we need is to project every event raised from the aggregate and update the views accordingly.

I can imagine that EventsByPersistenceId wouldn't be the way to go because in that case we'd have a live stream instance per persistenceId, one per order for example, unless we have some kind of passivation mechanism as we already do with aggregates.

Maybe the EventsByTag query could work if we tag our shards with a label, for example "order" in the classical e-commerce analogy. In that case, it'd be possible to mantain a stream per shard approach.

I'm not sure if I'm understanding the proposed query API correctly but IMHO our case could be viewed as a canonical example of DDD/CQRS and Event Sourcing approach.

Hope this helps.

@krasserm
Copy link
Owner

krasserm commented Sep 3, 2015

Here are my initial thoughts on the general architecture, constraints, implementation options and open issues.

Architecture

  1. Master data management. This architectural component is already in place backed by the messages table and the CassandraJournal actor is used for writing and reading events to and from that table. From a data organization perspective, events are ordered per PersistentActor (i.e. persistenceId) and no stronger ordering (such as insertion order per CassandraJournal instance, for example) is maintained.
  2. Query index management (not implemented yet). Data in the query index are derived from master data and are used to serve akka-persistence-query requests efficiently. In addition to eventual consistency, we also need to preserve causal consistency between master data and index data. Given a happened-before relation between events x and y, x -> y, the index may only be updated with y if update with x was successful. Otherwise we might run into problems that are comparable to Possible to read data with partially applied or unapplied batches during replay? #48. In Akka Persistence, a happened-before relation is only defined for events with the same persistenceId. Stronger partial orderings are not required but can be provided by plugins (such as insertion order by a CassandraJournal instance, see later).
  3. Stream production (not implemented yet). A component that creates event streams from index data. It not only serves streams from past (= stored events) but also updates streams from live events (i.e. events that have been recently written). Here, it is also important that we ensure causal consistency i.e. delivery order of events must be consistent with causal order (or any higher partial order, if supported).

Implementation options

  • Re (1): Although this component is already in place, it currently looses the insertion order of events generated per CassandraJournal instance. Consequently, we cannot easily determine what have been the last n events, but this is very important for efficiently updating streams with live events (= recently added events). With an insertion-ordered event table we can efficiently serve an all events stream from which applications can additionally derive other streams that are not directly supported by the akka-persistence-cassandra plugin.

    Nevertheless, an insertion-ordered event table makes event replay to PersistentActors less efficient and we need to create a separate persistenceId index (which is anyway needed for akka-persistence-query). Since Akka Persistence requires read-your-write consistency for PersistentActors, eventually consistent index reads must be completed with reads of remaining events from the master event table. Eventuate for example is already doing this in its Cassandra event log implementation. All other reads don't need read-your-write consistency.

  • Re (2): This looks like a good fit for materialized views of Cassandra 3 but I don't know if view updates preserve causal consistency: since we are writing events to several partitions, can we make sure that a materialized view is never updated with an event from partition n+1 before all event from partition n have been processed? If this cannot be guaranteed we need to write our own indexer which can be running in a separate process, independent from all journal actors, if needed.

  • Re (3): With the proposed insertion-order event storage in the master table and a causally consistent index creation, we can efficiently support offset queries, updates of streams with live events etc. and repeated queries would result in the same event ordering (although not required by akka-persistence-query but relevant if we plan to support insertion order event delivery, for example).

    Assuming, that the upcoming CDC supports materialized views and preserves insertion order, it seems to be a good implementation technology candidate. Given its current unavailability, however, I think our only option is the pull-approach listed as option 3 by @zapletal-martin. Pushing live events from journal actors to streams directly is possible but significantly more effort (and complex) because of the causal consistency requirement. We should consider it as an optimization after the pull approach has been implemented.

Before we start discussing implementation details, I'd find it helpful that we agree on the general architecture, constraints and high-level implementation options. Thoughts?

@zapletal-martin
Copy link
Collaborator Author

Thanks @krasserm for a detailed analysis. I conceptually agree with what you have outlined. There are still unknowns ragarding the actual implementation, but we will surely discuss those later.

I have done some work mostly on the Stream Production component. It may not be relevant as we need to discuss how exactly it will work, but I envision it to be relatively straightforward.

One thing we must take into consideration during design is backwards compatibility. It may be dependent on the actual implementation, but I assume we want to be able to correctly apply persitence query to events stored before the change as well.

I have a few clarification questions. I prefer to ask to make sure we all have shared clear understanding.

Re 1: "Consequently, we cannot easily determine what have been the last n events, but this is very important for efficiently updating streams with live events?" Shouldn't the typical query rather be events after nth (i.e. offset) to determine what is new in the stream?

"Since Akka Persistence requires read-your-write consistency for PersistentActors, eventually consistent index reads must be completed with reads of remaining events from the master event table." I probably don't understand the wording and proposed responsibilities of index and master data clearly. What do you mean by index and master here? Would current master change? Why wouldn't Akka Persistence read directly from master table similarly to current functionality?

Re 2: Good point. Let's find this out.

Re 3: "and repeated queries would result in the same event ordering (although not required by akka-persistence-query but relevant if we plan to support insertion order event delivery, for example)". I believe same event ordering of repeated queries is a requirement of akka persistence query. The documentation for EventsByPersistenceId reads "The same prefix of stream elements (in same order) are returned for multiple executions of the query, except for when events have been deleted." and for EventsByTag "The returned event stream is ordered by the offset (tag sequence number), which corresponds to the same order as the write journal stored the events. The same stream elements (in same order) are returned for multiple executions of the query."

@krasserm krasserm added ready and removed in progress labels Sep 9, 2015
@krasserm
Copy link
Owner

One thing we must take into consideration during design is backwards compatibility.

We can also achieve that with a migration tool or script. Breaking the existing schema shouldn't be an issue then.

Shouldn't the typical query rather be events after nth (i.e. offset) to determine what is new in the stream?

Yes exactly, my description was rather imprecise.

What do you mean by index and master here?

With master I mean the raw event log(s) as written by journal actors. With index, I mean tables derived from that log to serve queries efficiently, such a query by tag, for example. Instead of custom tables, this can also be a Cassandra secondary index but I'm not sure at the moment if this has an impact on write throughput, causal consistency or has other issues. Another example are Cassandra materialized views, as already mentioned in a previous comment. Anyway, I hope this clarifies what I mean with master and index.

Would current master change?

If we want to produce streams with journal insertion-order then we should consider a change of the master schema. Instead of ordering events by (persistenceId, sequenceNr) we would preserve an event order that is consistent with the insertion order from multiple journal actors.

Why wouldn't Akka Persistence read directly from master table similarly to current functionality?

With the above changes in the master, a query by persistenceId would then require a full log scan which is inefficient.

I believe same event ordering of repeated queries is a requirement of akka persistence query.

Ok, didn't know that.

@zapletal-martin
Copy link
Collaborator Author

We can also achieve that with a migration tool or script. Breaking the existing schema shouldn't be an issue then.

Breaking an existing schema is not an issue, but I initially thought (re)ordering information might be. See below.

Currently events are ordered by persistenceId and sequenceNr. It maintains partial ordering per CassandraJournal per persistenceId. Unlike Eventuate, persistenceIds can not be shared by multiple instances in Akka Persistence so it was fine.

I believe same event ordering of repeated queries is a requirement of akka persistence query.

Sorry, the ordering requirements I mentioned in previous comment were not accurate and were only true for LevelDB journal. The documentation clearly reads "Akka persistence query is purposely designed to be a very loosely specified API. A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as EventsByTag is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). Journals may choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering guarantee they provide - for example "ordered by timestamp ascending, independently of persistenceId" is easy to achieve on relational databases, yet may be hard to implement efficiently on plain key-value datastores.".

You however mentioned we would want to achieve insertion order to be able to efficiently update a stream, to be able to achieve resumable projections and stability between materializations which I agree with (although this is not a requirement).

You however mentioned that would require

insertion order of events generated per CassandraJournal instance.

Is that correct? Wouldn't it rather require order across CassandraJournal instances and persistenceIds/tags (for AllPersistenceIds and EventsByTag). I am avoiding the term total order, because of lighter requirements if documented properly, see below.

Since that requirement did not exist previously I thought it could potentially be challenging to infer the knowledge about order. Tags are yet to be introduced so backwards compatibility is not an issue. We just need to design how to achieve the order going forward. The ordering requirements are not too strict across instances as we don't necessarily need to know the real total order (we just need it persisted across materializations) and could therefore avoid coordination to obtain it. For AllPersistenceIds and EventsByPersistenceId we should be able to infer the knowledge during migration if required.

We may need to implement tagging for Akka Persistence Cassandra which I believe does not exist yet. I will create a separate issue for this, but it should be aligned with the outcome of our discussion here.

I will work on designing of queries and data representation and report back when ready.

@krasserm
Copy link
Owner

You can create a total order without coordinating among journal actors. The simplest case is just merging the streams written by each journal actor. If the merged stream shall be repeatable it must be persisted before delivering it.

This is fine as long as streams written by different journal actors are independent of each other. This is usually not the case: if a persistent actor is migrated from one journal actor to another (for example, during a failover in a cluster), there is a dependency because events with the same persistenceId are now partitioned across streams.

Simply merging these streams (i.e. without further constraints) might deliver these events in wrong order. What is therefore needed is a stream merge logic that enforces causal delivery per persistenceId, which shouldn't be that difficult to implement.

@krasserm
Copy link
Owner

Regarding re-ordering of events written by older journal versions, I wouldn't even try to do that but rather deliver the events for each persistent actor independently, either sequentially of interleaved. Applications shouldn't care anyway because they must assume that events with different persistenceId are not related.

In other words, events written by older journal versions can be used but the new ordering guarantees only apply to events written by the new version.

@krasserm
Copy link
Owner

To clarify, the aforementioned total order is not necessarily wall clock time order (although it may come close).

@zapletal-martin
Copy link
Collaborator Author

Thanks @krasserm . It is a significant change in current functionality. It will be crucial to implement the stream merging / master to index transformation. It should then be possible to achieve the desired guarantees as described before.

One thing where I am not still completely clear architecture wise is what is the indisputable advantage of separating master and index. The option I am considering as an alternative would be to have just the index part (e.g. events by persistence id - same as currently, events by tag and some efficient query structure for persistenceIds). That should efficiently support all the required queries, including current needs for replay etc. CassandraJournal would update all 3 tables directly when writing rather then updating master and having separate process for the denormalized replication. I did not yet think about the table representations, but it could potentially simplify the implementation (master to index replication and replay which are not trivial problems) maybe at the expense of some write speed (if any) and generality (e.g. supporting currently not required queries). Or I am missing something?

I have implemented the Stream production component supporting live streams, refresh interval, max buffer size and causal consistency given the aforementioned index structure that can be queried by offset as an ActorPublisher that can be used to create an Akka Streams Source.

The next steps will be to start implementing the master and index structures and the merging component. I do not yet have a clear vision of the implementation so please feel free to suggest. If Materialized Views support the required guarantees that could simplify the implementation. But that would mean dependency on Cassandra 3 which I am not sure is an option.

Since the whole issue is a relatively large amount of work should I create multiple smaller tickets a create smaller PRs into a separate branch or how would you like to see the work managed?

@zapletal-martin
Copy link
Collaborator Author

Regardless of the master-index transformation solution we need to have a discussion about data representation.

EventsByPersistenceId
Queries
  • select event by persistence_id, partition_nr using sequence_nr offset.

The existing table representation PRIMARY KEY ((persistence_id, partition_nr), sequence_nr)) should efficient support the above query and therefore both Persistence Query and replaying. It is also convenient that thanks to per persistence_id order paritioning can be easily managed even across multiple CassandraJournal instances.

EventsByTag
Queries
  • select event by persistence_id, partition_nr using offset.

This looks similar to previous query, but is actually quite different, because tags span persistence_ids and CassandraJournal instances. There are options how to implement this:

  1. Secondary index on Tag column. This could be inefficient depending on the data, see for instance http://www.wentnet.com/blog/?p=77 or https://pantheon.io/blog/cassandra-scale-problem-secondary-indexes.
  2. A new table representing tags by persistence id using something along PRIMARY KEY ((tag, ???), timeuuid)). Now the major difference to EventsByPersistenceIds is that events with tag are not ordered by sequence_nr and are not local to a single CassandraJournal instance. Therefore we don't have the option to obtain sequence_nr and partition_nr per tag easily. Paritioning using event's partition_nr and sequence_nr would not make much sense considering their local character. Unless we used a localised sequencing (e.g. a cluster singleton), but that could become a bottleneck during high volume writes. Incrementing column value each insert could be an option, but seems very inefficient (it would have to be read-write transaction). Partitioning/clustering based on time could be an option here.
  3. Materialized view. This option is quite similar to b) with the advantage that the denormalization/(re)partitioning would be done automatically server side. The remarks regarding partitioning remain mostly the same. Materialized views are not yet available and would not work for older Cassandra versions.
AllPersistenceIds
Queries
  • select persistence_id using offset

This query is more difficult. We would ideally store just the distinct values with idempotency attribute so that inserts of the same persistence_id would have no effect (insertion time ordered set). The options could be

  1. Using an existing table partitioned by persistence_id and a SELECT DISTINCT query. I am however not convinced about the scalability and ordering of such solution.
  2. A separate table with persistence_ids stored. Avoiding duplication would generally mean a need for insert if not exists semantics. I was hoping we could use idempotency attribute of the operation to ensure we can write without batch/LWT, but I could not find an efficient representation (e.g. something along CREATE TABLE IF NOT EXISTS unique_persistence_ids (whatever text, persistence_id text, PRIMARY KEY ((whatever), persistence_id)), but that would be ordered by persistence_id rather than insertion time and the partitioning would not be ideal either).
  3. A table with persistence_id partitioning key such as unique_persistence_ids and another table with the required ordering and supporting range queries such as persistence_ids_by_insertion_time. It would then be possible to use batch with combination with LWT to ensure uniqueness as well as the required query properties, but at the expense of some performance loss.
Master
Queries
  • select by persistence_id where sequence_nr > ?

If we had master data we would have to implement completion of reads during replay to ensure RYOW consistence as mentioned before. We would therefore have to be able to look up events for given persistence_id with sequence_nr higher than the highest sequence_nr found in index table.

If we didn't use master we could update all the tables in a batch with the potential impact of such approach.

As for the transforming of data from master to index it may not be trivial, because 1) any CassandraJournal may be removed at any time and therefore the transformation must not be tied to it. 2) During replay if we wanted to complete the read from master we may need to check in multiple CassandraJournal ordered partitions, again because an actor can change CassandraJournal.

Feedback and Cassandra data modelling advice appreciated.

@krasserm
Copy link
Owner

krasserm commented Oct 4, 2015

what is the indisputable advantage of separating master and index.

It's the same advantage how all databases internally use a transaction log (= master) and derive structured tables from it (= index). With event sourcing and CQRS this separation is visible to the application and entries in the transaction log (= event log) are usually not deleted. Writes to the master should be atomic and isolated (see also #48).

This is in general independent how the master is structured. You could write a single log (per journal actor) or n logical logs, one per PersistentActor i.e. a table that is sorted by (persistenceId, sequenceNr) as it is the case at the moment. To create an index from n logical logs, you'd need to process them independently which may be challenging for a large number of persistent actors.

CassandraJournal would update all 3 tables directly when writing

How do you handle failures in this case, especially with respect to atomicity and isolation? Assuming that different index writes go to different partitions, you could use a logged C* batch to ensure atomicity (not isolation) but this will break the requirement of Akka Persistence that reads by persistenceId must have read-your-write consistency. A logged C* batch only gives you eventual read consistency.

Alternatively, the journal actor could make an isolated and atomic write to the master and, if this write succeeds, make further index writes. This would meet the read-your-write consistency requirement but introduces sequential writes, one batch write to the master and then one or more (concurrent) writes to the index, which will reduce write throughput. Furthermore, if index writes fail, you need to retry them and this is best done with a separate process/worker ... and we again end up with the architecture that separates a master from an index.

Hope this makes my motivations for the proposed architecture clearer. There's nothing special with it, it only mirrors how most databases internally work. Anyway, for progressing with the implementation, I'm fine to have an experimental version where the journal actor makes all the writes as long as we document the consequences and later migrate to a clear master - index separation.

I have implemented the Stream production component supporting live streams, refresh interval, max buffer size and causal consistency given the aforementioned index structure that can be queried by offset as an ActorPublisher that can be used to create an Akka Streams Source.

Great. Looking forward to review the pull request(s).

Since the whole issue is a relatively large amount of work should I create multiple smaller tickets a create smaller PRs into a separate branch or how would you like to see the work managed?

+1

@krasserm
Copy link
Owner

krasserm commented Oct 4, 2015

Here are some thoughts regarding data processing and representation of which I think they could solve the issues you mentioned in your previous comment. I make the following assumptions:

  • Each journal actor writes to a separate log (as described here). These logs contain the master data.
  • A total order of events that is consistent with causal order can be generated by merging these logs. This can be done with stream-merging using merge logic that enforces causal delivery per persistenceId. The generated order is used to update the index.
  • Each index (IndexByTag, IndexByPersistenceId, ...) stores the read progress from the master logs i.e. a mapping log id -> sequence-nr (where the sequence-nr is generated by the journal actor). In failure cases, this is needed to resume index creation.
  • The master component supports the creation of a merged stream from given offsets in the master logs (needed for incremental, fault-tolerant index update).
  • An index updater is a single writer (actor, process, worker or whatever) that maintains a counter (= index writer specific sequence number) so that index entries can be ordered by write order. Since the write order is consistent with the causal order delivered by the merged stream from the master, overall causal consistency is preserved.

Using these assumptions:

  • an index by tag can be created by using a table with PRIMARY KEY (tag, index-writer-sequence-nr) which results in a table that can be easily be queried by tag and whose ordering is consistent with causal event ordering.
  • an index by persistenceId can be created by using a table with PRIMARY KEY (persistenceId, persistent-actor-sequence-nr)

To achieve read-your write consistency from the index (needed during replay, for example) the same mechanism as for resuming index updates can be used: first read the index, then complete the index read by reading from a merged stream from the master.

This approach doesn't only make us more independent from Cassandra implementation details such secondary indices, materialized views or whatever, it could also allow us to do master data management with a different storage backend, such as Kafka, for example (using a Kafka partition per journal actor).

Following this approach would make it rather easy to reason about data consistency and deal with failures IMO. It is furthermore very similar to how many stream processing and data analytics pipelines are structured (in contrast to updating several indices simultaneously from a single writer). The only critical thing we need to implement is a stateful, causality-preserving stream merging from master logs (for which solutions definitely exist).

@patriknw
Copy link
Collaborator

@krasserm @zapletal-martin Now, when Cassandra 3.0 is released with the new Materialized Views, do you still see any reason for not using them? The EventsByTag query is important for the project that I'm currently working on so I would like to help out implementing it.

I played around with materilaized views and here is a cqlsh session for EventByTag: https://gist.github.com/patriknw/4bcec28b8d3e5c5e56cc

@chbatey
Copy link
Collaborator

chbatey commented Nov 16, 2015

hey @patriknw is this going to production any time soon? MVs are a very experimental feature I would wait for a major release before using.

@patriknw
Copy link
Collaborator

so you are saying that MVs in Cassandra 3.0 is not production ready?

@chbatey
Copy link
Collaborator

chbatey commented Nov 16, 2015

Definitely not, normally 6 months for a major release to become stable, especially any headline feature e,g Light Weight Transactions for 2.0.

@patriknw
Copy link
Collaborator

Thanks for the information.
Anyway, Akka Persistence Query is also experimental, and I doubt that we will be able to implement manual materialized views in a better way than what is done in C*. However, there might be something missing feature wise?

@chbatey
Copy link
Collaborator

chbatey commented Nov 16, 2015

For EventsByTag I can't see any issues feature wise. However we'd need to be careful about partition size, we wouldn't want it to get more than a few million events with the same tag.

To get around the problem for persistenceId in the raw data table we add out own synthetic partitioning which we could do for the eventsbytag table if it was hand crafted rather than using the built in MV feature.

Or if we added year, month, day columns to the original table we could use them in the MV partition key. This would make queries slower for due to multi partition scanning for cases where there aren't many events for a tag (unless you always queries for the latest events less than a day old) but would keep partitions to a max size of events for a tag in one day.

@patriknw
Copy link
Collaborator

That is a good point.

Another thing I'm uncertain about is what we can use as offset column. I was thinking of a timeuuid, but that might be too weak?

@zapletal-martin
Copy link
Collaborator Author

I would generally be in favour of using materialized views over custom solution, because it would simplify the solution and some of the difficult problems would be solved for us. But there are a few things I can see as potential issues.

  1. Do we know if materialized views provide causal consistency in addition to eventual consistency?
  2. Materialized views have PK limitations, including - need to include all primary key columns of the base table and MV primary key must contain exactly one key column that is not part of the base table’s PK (CASSANDRA-9928). Does that give us enough flexibility?
  3. Advanced functions - as you mentioned we may want to limit partition size, check for uniqueness (all persistence ids query), maintain offset etc. There may be ways to get around this, but are they fit for our use case?
  4. Fault tolerance - how do MVs work under failure conditions? Is causality still maintained? (I would assume failure scenarios are handled correctly, but can't find any references).
  5. Compatibility - can we assume Cassandra 3 is used?

@krasserm
Copy link
Owner

I share your concerns @zapletal-martin, especially the causal consistency concern. Since our raw data table is partitioned and there's only eventual consistency between a table and its corresponding MV, we might end up in the following situation:

Given that eventi is written to partitionj and then eventi+1 is written to partitionj+1, the MV update for partitionj+1 may occur before that for partitionj (as we only have eventual consistency guarantees). A read between these two MV updates does violate causal consistency if we read eventi+1 from the MV but not eventi.

The situation gets even more problematic if the MV update order is not even defined for writes to the same partition. With more problematic I mean, chances for inconsistent reads would significantly increase.

@chbatey are there any MV update ordering guarantees given by Cassandra 3.0 to avoid these potential problems?

@chbatey
Copy link
Collaborator

chbatey commented Nov 17, 2015

No. I had a similar example typed up on my phone. The writes to the MV are asynchronous so a newer MV write could potentially overtake an older one.

On 17 Nov 2015, at 05:06, Martin Krasser notifications@github.com wrote:

I share your concerns @zapletal-martin, especially the causal consistency concern. Since our raw data table is partitioned and there's only eventual consistency between a table and its corresponding MV, we might end up in the following situation:

Given that eventi is written to partitionj and then eventi+1 is written to partitionj+1, the MV update for partitionj+1 may occur before that for partitionj (as we only have eventual consistency guarantees). A read between these two MV updates does violate causal consistency if we read eventi+1 from the MV but not eventi.

The situation gets even more problematic if the MV update order is not even defined for writes to the same partition. With more problematic I mean, chances for inconsistent reads would significantly increase.

@chbatey are there any MV update ordering guarantees given by Cassandra 3.0 to avoid these potential problems?


Reply to this email directly or view it on GitHub.

@patriknw
Copy link
Collaborator

Interesting questions, indeed. They discuss consistency in length in CASSANDRA-6477, but I'm not sure what the conclusion or implementation is.

I assume that MV is updated in the same order as the base table for one partition, i.e. we have causal consistency per persistenceId (with some caveat when we change partitionId for a persistenceId).

I assume that we don't have causal consistency across different persistenceId, and even though it would be convenient to have that I don't think it's critical.

We can achieve a best effort experience by ordering on a timeuuid on the read side and delay reading the tip with a few seconds. That will not work in case of network partitions, where MV updates are delayed a long time, and that is also why I don't think we should guarantee exact same results for a query that is run multiple times. A later query may return more events than an earlier query, because the MV was updated with old events that got stuck during the network partition.

By delivering the unique timeuuid for each event to the application, it can retry the queries and filter out duplicates. We can also provide a count query for quick checking if there are any more events.

@chbatey
Copy link
Collaborator

chbatey commented Nov 17, 2015

I assume that MV is updated in the same order as the base table for one partition, i.e. we have causal consistency per persistenceId (with some caveat when we change partitionId for a persistenceId).

I am not sure we can assume that. Writes for persistenceid seq nr 1 could go to one node, seq nr 2 could go to another node and the async replication of the MV for write two could be come available before 1.

Joel Knigton (@joelknighton) did some testing with Jepson for MVs, will ask him to take a look.

@zapletal-martin
Copy link
Collaborator Author

I assume that we don't have causal consistency across different persistenceId, and even though it would be convenient to have that I don't think it's critical.

We can achieve a best effort experience by ordering on a timeuuid on the read side and delay reading the tip with a few seconds. That will not work in case of network partitions, where MV updates are delayed a long time, and that is also why I don't think we should guarantee exact same results for a query that is run multiple times. A later query may return more events than an earlier query, because the MV was updated with old events that got stuck during the network partition.

Yes that is a solution that would work. It however explicitly chooses certain tradeoffs (e.g. not guaranteeing exact same results, causality and exploiting randomness instead of guaranteed correctness). Given how MVs seem to work we would have to choose some tradeoffs in any case.

It seems the decision is not as much about implementation details, but about conceptual correctness, implementation complexity, use cases and requirements of persistence query.

@zapletal-martin
Copy link
Collaborator Author

I attempted an implementation of the solution that uses Cassandra's materialized views. It involves storing Tag, changes to replay, event by tag query actor. The implementation seems to be pretty simple for the eventsByTag query. We could subpartition the data based on a timestamp (either configurable by user based on their needs or year/month/day etc.). Alternatively we could use a different storage format where persistenceId would not be the primary to give us more freedom, but that would complicate recovery.

There seem to be some caveats, for example we would have to have multiple rows per persistenceId-sequenceNr combination(event can have multiple tags and C* collection is not an option in this case) which complicates replay and stream emitting slightly. The QueryActorPublisher we have should work for EventsByTag query without major changes. Overall it seems that the implementation of EventsByTag is pretty quick and straightforward. AllPersistenceIds or other complicated queries not as much though.

To summarise the advantages and disadvantages to help us make the best decision

Materialized views
Advantages

  • Simple and quick implementation
  • Some of the difficult work handled for us with good quality

Disadvantages

  • We can not manually control partitioning (may require multi partition scanning, not optimized partition sizes etc.)
  • We can not use additional precomputation logic, e.g. grouping
  • Flexibility (partitioning key limitations etc.)
  • No control over the repartitioning (so we may need to apply some partial solutions such as delaying reads, randomness and timeouts, not being able to provide additional guarantees to eventual consistency)
  • Only available for Cassandra 3
  • No causal consistency guarantee
  • No same result for multiple queries

Custom solution
Advantages

  • Manual control of partitioning
  • We can use additional logic during repartitioning/merging (e.g. allPersistenceIds or whatever queries we may need)
  • Additional consistency guarantees, causality
  • Same results for repeated queries
  • A lot of control (e.g. independence on storage backend, additional preprocessing etc.)

Disadvantages

  • We have to implement, test (not a real disadvantage, but more time is required to make sure all works perfectly)
  • Replay and recovery more complicated and likely less performance/memory efficient (due to the idependent streams nature)

Hybrid approach

  • Using both Materialized views for some queries and processing logic for precomputation of more complex views such as AllPersistenceids

Is there anything I missed?

I think both solutions are relevant. As mentioned previously we need to consider conceptual correctness, but also use cases, tradeoffs and requirements of persistence query (which are not as strict in terms of guarantees or even which queries must be supported). It would be great to discuss and drive the decision making so we can proceed with the implementation. (I will have quite some time over next week/s to contribute :))

@krasserm
Copy link
Owner

@zapletal-martin thanks for your spot-on analysis. At least for the projects I work(ed) on

  • No causal consistency guarantee
  • No same result for multiple queries

would be a blocker. Furthermore, I also think that

independence on storage backend, additional preprocessing etc.

is an important aspect for the long-term evolution of akka-persistence-xxx storage plugins. For example, using a Kafka/Cassandra hybrid where raw logs (written by journal actors) are stored in Kafka topic partitions and indices in Cassandra might be an interesting solution.

I also like the hybrid approach you proposed, using MVs where appropriate (e.g. AllPersistenceIds) but a custom solution where clear semantics (causality) and repeatability of query results are important.

Here's my +1 for a focus on a custom solution, using a hybrid where appropriate.

@patriknw
Copy link
Collaborator

I have also put together a prototype of EventsByTag query using materialized view. I see no blockers for the needs I have in my current project.

No causal consistency guarantee

I think I can achive right replay order for each persistenceId without too much trouble. As I understand it you want more causaility guarantees across different persistenceIds. With the custom indexer you have outlined you can achive causality per writing journal instance. Too be honest, I think it can be dangerous to rely on something like that because its not location transparent and would not fit well with Cluster Sharding. Have I misunderstood something?

I don't think AllPersistenceIds query is very interesting, and not something we need to implement, if we have the much more interesting EventsByTag query.

That said, I agree that a custom solution gives more flexibility and power. I suggest that @zapletal-martin continues working on that and I can put together a first EventsByTag based on materialized view. It should not be much more than a few days work, and that is in line with my budget. I don't mind if it is later replaced with something more powerful.

I'm also happy to help out with reviewing, if piecemeal pull requests are created.

@krasserm
Copy link
Owner

@patriknw @zapletal-martin sound good to me!

Regarding

I think it can be dangerous to rely on something like that because its not location transparent and would not fit well with Cluster Sharding. Have I misunderstood something?

I let @zapletal-martin answer that :-)

@zapletal-martin
Copy link
Collaborator Author

I think I can achive right replay order for each persistenceId without too much trouble. As I understand it you want more causaility guarantees across different persistenceIds.

Replay order is not an issue if you are storing events partitioned by persistenceId as currently. I believe the goal is to achieve causality control for view precomputation, e.g. per persistenceId or tag. So you avoid issues such as first query returns events 1,2,4 and the same query run again returns 1,2,3,4.

Too be honest, I think it can be dangerous to rely on something like that because its not location transparent and would not fit well with Cluster Sharding. Have I misunderstood something?

I am not sure I fully understand your comment. I assume by location transparency you mean an actor can live in a cluster and is referenced transparently whereas journal is 'physical' and may therefore be added/removed and actors rebalanced. That is true, but something we accounted for. The journal must have unique id which should however be possible to be reused. From external viewer's perspective I think it is location transparent or the location transparency is achieved in the stream merging mechanism where the independent streams are reconciled. I do not see any issue regarding cluster sharding (actually I use sharding in all my tests). I may be missing something though.
I am not sure how performance can compare to Cassandra's MV due to limited control of the merging locality. But I think the merging can be sharded (by persistenceId, tag, ...) if that proves to be a bottleneck (it is a single actor at the moment). That was something I wanted to review during the 'PoC' phase.

That said, I agree that a custom solution gives more flexibility and power. I suggest that @zapletal-martin continues working on that and I can put together a first EventsByTag based on materialized view. It should not be much more than a few days work, and that is in line with my budget. I don't mind if it is later replaced with something more powerful.

@krasserm, @patriknw sounds good. There are some conflicts that may need changes and migration (e.g. different table definitions, replay etc.), but that is expected. Feel free to use any of the existing code from wip-persistence-query branch.

The read journal, configuration classes, query actors etc. are there and are working (eventsByPersistenceId is there and eventsByTag should not be too different using the existing base class), tested and could easily be reused).

Also @patriknw I would be more than happy to help so please let me know if I can contribute (e.g. the read side which is partially already done in wip-persistence-query or other parts that require new work such as replay etc?)!

What I am thinking at the moment is I could help Patrik build eventsByPersistenceId and eventsByTag using MVs with the aforementioned tradeoffs which should be relatively straightforward and we can then take our time to try to build the PoC and asses its correctness, performance and scalability.

@patriknw
Copy link
Collaborator

Let me clarify with an example. We have persistent actors A and B.

  1. persist event a1 and a2 in A
  2. send a comand from A to B
  3. when receiving the command persist b1 and b2 in B

All these events are tagged with same tag. You want to guarantee causual consistency so that EventsByTag returns a stream with events in order a1, a2, b1, b2 if A and B were using same journal instance, but if A and B were not using same journal instance (i.e. located on different cluster nodes) you can't guarantee that b1 comes after a2 in the query stream.

Therefore I challenge the usefulness of the causual consistency per journal that you seem to find so imporant.

The order a1 -> a2 and b1 -> b2 must of course be maintained.

@krasserm
Copy link
Owner

The order a1 -> a2 and b1 -> b2 must of course be maintained.

This can not be guaranteed when making a query by tag from the MV. This may arise if

  • the journal writes a1 then a2 and a3
  • Cassandra updates the MV with a1 then a3 and then a2 (independent async updates as mentioned here).
  • a read between the 2nd and 3rd MV update returns a1, a3 which violates causal consistency because a3 is visible before a2.

This is what we mean when talking about causal consistency. We do not aim to achieve causal consistency as Eventuate does, for example.

@patriknw
Copy link
Collaborator

Then I'm sorry that I completely misunderstood what you were refering to with casaul consistency. I thought you meant across different persistentIds.

I'm aware about the async nature of MV and my plan in to solve that on the read side using the sequence number per persistentId.

@krasserm
Copy link
Owner

No problem. Causal consistency is usually discussed in context of what you described. We've just used the more general definition here: an effect must never be observed before its (potential) cause.

@zapletal-martin
Copy link
Collaborator Author

Therefore I challenge the usefulness of the causual consistency per journal that you seem to find so imporant.
you can't guarantee that b1 comes after a2 in the query stream.

For persistenceId we have the sequenceNr so we can achieve causality. There is no ordering defined across the journal instance "streams" so your example is right for tag query. I wonder if there is a way to infer a single total order for tags (any repeatable same order with causality by persistenceId).

I'm aware about the async nature of MV and my plan in to solve that on the read side using the sequence number per persistentId.

That sounds like a good idea to me. You may be facing some of the same difficulties that we are (causal merging, potentially unbounded number of persistenceIds etc.). This case however is simplified, because you shouldn't have to worry about failures, resuming the stream etc.

@patriknw I also sent you an email seeking cooperation. I think I could help you with the effort and potentially avoid doing quite some of the work that we already did in our branch.

Thinking about it the stream merging logic we have cuould also be reused for the events by tag query causality.

@krasserm would it be worth creating a separate branch for this work again?

@krasserm
Copy link
Owner

would it be worth creating a separate branch for this work again?

Sounds good to me.

@patriknw
Copy link
Collaborator

I'll create the branch tomorrow morning and open the first pull request.

@zapletal-martin
Copy link
Collaborator Author

@krasserm @patriknw I have created wip-materialized-views-persistence-query branch.

I have also created #124 review which contains the bits that I think are relevant to the implementation using materialized views. Some of them are duplicated in Patrik's branch which is fine. But in my opinion some code is quite valuable and would be great if it was reused (see my comments in the review). Let's have a discussion tomorrow and define implementation plan and work distribution.

@patriknw
Copy link
Collaborator

I have implemented eventsByTag using materialized views. It's ready as far as I can see. Please review.
Let's discuss in the PR. #128

@patriknw
Copy link
Collaborator

@zapletal-martin I have not migrated this and (sub-issues) to akka/akka-persistence-cassandra.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants