Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 48 atomic updates #69

Merged
merged 2 commits into from
Sep 1, 2015
Merged

Issue 48 atomic updates #69

merged 2 commits into from
Sep 1, 2015

Conversation

chbatey
Copy link
Collaborator

@chbatey chbatey commented Jul 27, 2015

Hi @krasserm - this will needs some clean up I just wanted some feedback / early discussions on the schema changes. I'll remove all the logging I was just 100% checking that I understood how/when akka-persistence called into the plugin.

I removed marker and added a static boolean column called inUse. This will be set on any write and when we skip an entire partition due to a large persistAll. This is used for scans (like the marker used to be) to make sure we keep scanning if entire partitions have been deleted or skipped.

I removed COMPACT STORAGE as this will limit our future schema changes and the space issue is being addressed in the next version of Cassandra (https://issues.apache.org/jira/browse/CASSANDRA-8099).

For atomicity I went with a similar approach to discussed on the issue:

  1. If an AtomicWrite spans a partition it is put in the higher partition
  2. Fail fast if someone tries to persist an atomic event that spans 3 partitions. Knowing a sequence number can only be in one partition greater than it should be makes finding easier. I'll be impressed if someone tries to persist an AtomicWrite that is 5-10M in size :)

So the trade off is that we may end up with slightly varied partition sizes but I don't see that as an issue and will only be noticeable if people have huge AtomicWrites.

TODO:

  • Handle deletes for seqNr that got moved to the next partition. We'll either need to read then write to see where it is or delete both possible locations WDYT? I generally try and avoid read and writes due to the race conditions and bad perf due to round trips but the double delete adds unnecessary tombstones.

Couple other things:

  • Changes the tests to use UUIDs as persistenceIds as I kept adding tests in the middle :)

@chbatey
Copy link
Collaborator Author

chbatey commented Jul 28, 2015

I added a test for the scenario mentioned in the TODO section and fixed via a double delete.

new MessageIterator(persistenceId, fromSequenceNr, toSequenceNr, max).foreach( msg => {
replayCallback(msg)
})
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended changes from adding logging and removing, will remove

@krasserm
Copy link
Owner

krasserm commented Aug 1, 2015

Hi @chbatey thanks for the pull request and sorry for the late reply.

The mechanism for having varied partition sizes so that a batch fits into a single partition looks good to me. However, ensuring that only AtomicWrites fit into a single partition is not enough to fix #48: it should be the whole Seq[AtomicWrite] that is passed as messages batch to asyncWriteMessages.

The messages batch comes from a single PersistentActor which accumulates AtomicWrites internally before sending them to the journal. If the messages batch crosses a partition, you still can end up with reading a partially applied batch (leaving the PersistentActor in an incomplete/incorrent state).

@krasserm
Copy link
Owner

krasserm commented Aug 1, 2015

Regarding the name maxPartitionSize: in the worst case we can end up with partition sizes that are 2 * maxPartitionSize. Wouldn't a rename to minPartitionSize make more sense (and documenting that a partition size cannot grow beyond 2 * minPartitionSize)? Alternatively, we keep maxPartitionSize and use maxPartitionSize / 2 for internal calculations. WDYT?

preparedDeletePermanent.bind(mid.persistenceId, partitionNr(mid.sequenceNr): JLong, mid.sequenceNr: JLong)
val firstPnr: JLong = partitionNr(mid.sequenceNr)
val stmt = preparedDeletePermanent.bind(mid.persistenceId, firstPnr: JLong, mid.sequenceNr: JLong)
// the message could be in next partition as a result of an AtomicWrite, alternative is a read before write
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a read-before-delete be done for every message or can this be implemented more efficiently? If we could implement that with a single read per partition (or n reads where n is a small constant) I'd prefer going for that approach. We only append data, so race conditions shouldn't be an issue. If we need a read for every message, we should go for the redundant-delete approach (as users might delete millions of old messages with a single request).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - let me change this and maybe put another static field we can read if we've moved sequences into the next partition

On 1 Aug 2015, at 15:32, Martin Krasser notifications@github.com wrote:

In src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala:

}

private def asyncDeleteMessages(messageIds: Seq[MessageId]): Future[Unit] = executeBatch { batch =>
messageIds.foreach { mid =>

  •  val stmt =
    
  •    preparedDeletePermanent.bind(mid.persistenceId, partitionNr(mid.sequenceNr): JLong, mid.sequenceNr: JLong)
    
  •  val firstPnr: JLong = partitionNr(mid.sequenceNr)
    
  •  val stmt = preparedDeletePermanent.bind(mid.persistenceId, firstPnr: JLong, mid.sequenceNr: JLong)
    
  •  // the message could be in next partition as a result of an AtomicWrite, alternative is a read before write
    
    Would a read-before-delete be done for every message or can this be implemented more efficiently? If we could implement that with a single read per partition (or n reads where n is a small constant) I'd prefer going for that approach. We only append data, so race conditions shouldn't be an issue. If we need a read for every message, we should go for the redundant-delete approach (as users might delete millions of old messages with a single request).


Reply to this email directly or view it on GitHub.

@krasserm
Copy link
Owner

krasserm commented Aug 1, 2015

+1 for the schema changes.

@chbatey
Copy link
Collaborator Author

chbatey commented Aug 1, 2015

Regarding the property name.

I think the normal case is not using atomic writes, and then this only happens when an atomic write spans a partition boundary so we should use a config name for when the span doesn't happen e.g minPartitionSize or perhaps targetPartitionSize

@chbatey
Copy link
Collaborator Author

chbatey commented Aug 1, 2015

My interpretation is that atomic writes are when the actor uses persistAll and we're only meant to guarantee atomicity for these messages. With the use case that one command produces multiple events so recovering a subset of the events from a persistAll would leave the actor in an inconsistent state.

The seq is an internal optimisation for persisting many calls to persistAsync in a single trip to the journal plugin.

The language changed quite a bit for M2 here is the snipper for AtomicWrites:

Atomic writes
Each event is of course stored atomically, but it is also possible to store several events atomically by using the persistAll or persistAllAsync method. That means that all events passed to that method are stored or none of them are stored if there is an error.

The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by persistAll.

And from the ScalaDoc of AsyncWriteJournal:

Plugin API: asynchronously writes a batch (Seq) of persistent messages to the

  • journal.
  • The batch is only for performance reasons, i.e. all messages don't have to be written
  • atomically. Higher throughput can typically be achieved by using batch inserts of many
  • records compared inserting records one-by-one, but this aspect depends on the
  • underlying data store and a journal implementation can implement it as efficient as
  • possible with the assumption that the messages of the batch are unrelated.

WDYT? Worth checking with @ktoso

@krasserm
Copy link
Owner

krasserm commented Aug 1, 2015

You're right. I wrongly assumed that we have logged C* batches but that's not the case for a single-partition batch-write (which is then atomic and isolated). Great implementation then, and sorry for the noise 😉.

@krasserm
Copy link
Owner

krasserm commented Aug 1, 2015

we should use a config name for when the span doesn't happen e.g minPartitionSize or perhaps targetPartitionSize

+1 for targetPartitionSize

@ktoso
Copy link

ktoso commented Aug 1, 2015

/me catching up with the discussion here, I like what I see! :-)
Seems we're all on the same page about semantics already, but allow me to ACK those so we're confident in the semantics :-)

  1. Fail fast if someone tries to persist an atomic event that spans 3 partitions. Knowing a sequence number can only be in one partition greater than it should be makes finding easier. I'll be impressed if someone tries to persist an AtomicWrite that is 5-10M in size :)

Yeah, I don't think that will happen, and if it happens it's an user error - feel free to reject such large atomic writes with "hey, that's crazy large!" ;-)

My interpretation is that atomic writes are when the actor uses persistAll and we're only meant to guarantee atomicity for these messages.

Correct. The atomicity guarantee is only about that group of events. The Seq[AtomicWrite] could contain two atomic writes, which originate from persistAll(); persistAll() and the atomicity should be guaranteed for them independently, i.e. first atomicwrite can succeed, but 2nd fails - that's OK. If someone wants "all together" they must fit it explicitly into one persistAll so that it's explicit what they need to be atomically written.

With the use case that one command produces multiple events so recovering a subset of the events from a persistAll would leave the actor in an inconsistent state.

The seq is an internal optimisation for persisting many calls to persistAsync in a single trip to the journal plugin.

Correct again, the seq is there to spare multiple round trips to the journal plugin, instead we batch them up and send them together (this is to allow batch inserts in certain datastores).

The issue #48

I see what edge case is meant there, but I don't think it relates to this PR since here we're talking about the AtomicWrite, which we can keep in one partition (as I understood).

I'll dive into code later on, looks great from what I've skimmed so far!

@krasserm
Copy link
Owner

krasserm commented Aug 2, 2015

@ktoso @chbatey I agree with all statements made regarding atomicity but after thinking more about the specification, I see an issue with the assumption made in the AsyncWriteJournal docs that

... the messages of the batch are unrelated

This is actually not the case. From a business logic perspective, later messages in the messages batch may actually depend on earlier messages. For example, later messages in the batch may have been generated during command processing in a PersistentActor whose state is a f(earlier messages).

Consequently, this means that these messages must not be written independently to Cassandra (btw, this problem is unrelated to the atomicity of AtomicWrites). However, we are currently writing these messages independently. Each element of groupedStatements is mapped to an asynchronous C* batch execution where different batches may potentially be written to different partitions. These batch writes may fail independently, so we may end up in a situation where the batch write of earlier messages fails but that of later messages succeeds. This may result is corrupt actor state then.

To avoid this, we need to ensure that earlier messages have been successfully written to C* before newer messages are written. The easiest way to achieve this is writing the whole messages batch as single C* batch to a single partition.

Thoughts?

@ktoso
Copy link

ktoso commented Aug 2, 2015

Each element of groupedStatements is mapped to an asynchronous C* batch execution where different batches may potentially be written to different partitions. These batch writes may fail independently, so we may end up in a situation where the batch write of earlier messages fails but that of later messages succeeds. This may result is corrupt actor state then.

Good catch - indeed that would be a problem. I'll open a ticket to change the wording in the docs as well.

The solution you propose looks good.
The only complication is that the plugin must check if the entire batch can fit in the partition, and if not pick a higher one, right (as this PR already does for AtomicWrites)?

@krasserm
Copy link
Owner

krasserm commented Aug 2, 2015

Right, same algorithm applied to the whole messages batch.

I should mention that we may run into these problems only when using persistAsync(). I used that in the past in combination with actor state that has been tentatively updated with events whose persistence is in progress. This allowed me to take advantage of eager command validation and event batching for increased throughput. Tentative updates are then either confirmed or rolled back in memory depending on the persistence outcome. This worked pretty well and I have also seen other applications doing that. However, I agree that this is a somewhat exotic use case.

We also do not run into these problems when using persistAsync() and actor state is only updated from successfully persisted events, because none of the events in a messages batch (before invocation of writeMessagesAsync) have yet been applied to current state. So later messages in the batch cannot depend on earlier in this case. When using persist() or persistAll() the messages batch has anyway only one entry, so it's not an issue here either.

In my opinion, the best solution would be to get rid of persistAsync() at all and implement batching on journal level, as mentioned in my blog post that compares Akka Persistence with Eventuate and as implemented in Eventuate. In this case however, journals must be prepared that a messages batch may contain events with different persistenceId.

tl;dr: for the majority of use cases the current PR is good enough. If persistAsync() is removed/deprecated then the PR covers all use cases.

@chbatey
Copy link
Collaborator Author

chbatey commented Aug 3, 2015

Great point @krasserm we can either do as you said or chain atomic writes for the same persistenceId to ensure previous batches aren't executed until the previous one succeeds. Or at the akka-persistence loosen the constraints for persistAsync (but I don't think that is a useful programming model).

I'll update this PR to merge atomic writes for the same persistenceId in the mean time.

@patriknw
Copy link
Collaborator

patriknw commented Aug 3, 2015

I agree that batching across different persistentIds are very interesting and should be possible to add with the current api. I don't think we even say how the batching is performed and that all messages in the batch come from the same persistenceId, so if a journal is basing something on that I think that is only based on vague implementation assumptions.

That kind of batching should increase the system overall throughtput when using many persistent actors, which is also what we think is most important. However, it is not a replacement for persistAsync for high throughput use cases for a single persistent actor.

I don't fully understand your concern about persistAsync and why we should remove it. The tentative update of the state is indeed exotic. The new failure handling that stops the actor unconditionally in case of storage failures might influence the design of such usage?

@krasserm
Copy link
Owner

krasserm commented Aug 4, 2015

Akka Persistence should definitely offer functionality to allow batching (high-throughput) of a single writer. I just think that PersistentActor is the wrong place to provide that functionality. Currently, when using persistAsync() you anyway cannot validate commands against current actor state because that state is stale (= eventually consistent with the event log). So persistAsync() is used typically used by stateless writers. A stateless writer doesn't need to be an actor and Akka Persistence should offer something like

trait PersistentWriter {
  def persistenceId: String
  def persistAsync(...): Future[...] = { ... }
}

for writing events to the journal. To derive state from the written events, applications should use a PersistentView, for example. With this design, you've properly separated a stateless event writer from a stateful event aggregator. Should an application still need access to transient state while writing, it still can wrap the PersistentWriter into an actor.

In my opinion, the main idea behind PersistentActor is to have a strong consistency boundary around persistent state. This requires actor state to be in sync with the event log and this can only be achieved by using persist() or persistAll().

A separate PersistentWriter should also support batching for a single writer (persistenceId). With this separation of concerns you could also remove all the batching code from Eventsourced which would significantly simplify its implementation. Batching across different PersistentActors can still be implemented with a separate batching layer between PersistentActor and the journal.

I think the main reason that persistAsync() still exists on PersistentActor is (legacy) support for command-sourcing. Making PersistentActor and event-sourcing-only solution wouldn't only simplify its implementation but also its specification. Command-sourcing use cases can still be supported with PersistentWriter if needed.

The new failure handling that stops the actor unconditionally in case of storage failures might influence the design of such usage?

Yes it would. And it would be great if you could make the unconditional a conditional for making a migration of older apps possible. Anyway, that sort of failure handling is most useful if actor state is in sync with the journal i.e. for a PersistentActor that doesn't offer persistAsync(). In this case unconditional stops make completely sense.

However, it doesn't really make sense for PersistentWriter use cases such as command-sourcing. You still could do that by reading the last event from the journal after a failure, if needed, but there are also many applications that are duplicate-tolerant, for example.

This is another reason why a separation of concerns would make sense. Having a separate PersistentWriter, the tentative update use case could be further supported by mixing PersistentWriter into a view, without being affected by the unconditional-stop-change for PersistentWriter.

@krasserm
Copy link
Owner

krasserm commented Aug 4, 2015

do as you said or chain atomic writes for the same persistenceId to ensure previous batches aren't executed until the previous one succeeds.

I think chaining would be even better as we may run into the very same issue for multiple writeMessagesAsync() calls.

Or at the akka-persistence loosen the constraints for persistAsync (but I don't think that is a useful programming model)

See my previous comment for a detailed description of motivation why persistAsync() should be removed from PersistentActor.

@patriknw
Copy link
Collaborator

patriknw commented Aug 4, 2015

Martin, thanks for the detailed clarification. PersistentWriter is an interesting idea. Perhaps it could be stream based? I will discuss this topic with Konrad the next few days.

persistAsync allow another thing than high throughput command sourcing. It makes it possible for the actor to handle other incoming messages that don't need fully consistent state while persisting.

@krasserm
Copy link
Owner

krasserm commented Aug 4, 2015

Perhaps it could be stream based?

Yeah, that would really make sense.

persistAsync allow another thing than high throughput command sourcing. It makes it possible for the actor to handle other incoming messages that don't need fully consistent state while persisting.

In this case, PersistentActor should additionally use a PersistentWriter. From an implementation perspective, it should be easy to achieve that the relative order of writes, when coming from the same actor (or persistenceId), is preserved.

With this combination of PersistentActor and PersistentWriter we'd have what PersistentActor currently provides but this is only needed for some exotic/rare use cases where validation against eventually consistent internal state is needed. Do you have a real-world example for that? I have some problems to imagine one ...

@patriknw
Copy link
Collaborator

After some thinking and discussions we have concluded that for 2.4 we will not touch persistAsync. The idea with the PersistentWriter can be explored more later. Since streams is experimental we can anyway not incorporate it in standard akka-persistent now.

@chbatey
Copy link
Collaborator Author

chbatey commented Aug 17, 2015

Reminder to what needs doing on this:

  1. Change property name to targetPartitionSize
  2. Come up with a way of avoiding the redundant deletes
  3. Chain writes for the same persistence id

@chbatey
Copy link
Collaborator Author

chbatey commented Aug 18, 2015

Regarding 3) @krasserm for this PR I am planning on just putting them in the same partition. When we do M3 and have the persistenceId in the AtomicWrite I'll change it + I want to add some extra features to stubbed cassandra (discussed here: scassandra/scassandra-server#103) so we can actually test that batch N+1 isn't executed until N is complete. At the same time I'll add tests for all the failure scenarios as stubbed cassandra can deterministically create them all.

@krasserm
Copy link
Owner

@chbatey +1 to your plan. Also, would love to see stubbed cassandra being used for testing.

@chbatey chbatey changed the title WIP: Issue 48 atomic updates Issue 48 atomic updates Aug 22, 2015
@chbatey
Copy link
Collaborator Author

chbatey commented Aug 22, 2015

Hi @krasserm i've updated this to do a single read per partition before starting deletes i.e

select inuse, sequence_nr from messages where persistence_id = ? and partition_nr = ? order by sequence_nr desc limit 1

Which results in no unnecessary tombstones. Each partition is then split into delete batches based on max delete size.

Given this is quite a significant change I'll wait for you to get back to review.


This version of `akka-persistence-cassandra` depends on Akka 2.3.9 and is cross-built against Scala 2.10.4 and 2.11.6. It is compatible with Cassandra 2.1.0 or higher. Versions of the Cassandra plugins that are compatible with Cassandra 1.2.x are maintained on the [cassandra-1.2](https://github.com/krasserm/akka-persistence-cassandra/tree/cassandra-1.2) branch.
This version of `akka-persistence-cassandra` depends on Akka 2.4.M2 and is cross-built against Scala 2.10.4 and 2.11.6. It is compatible with Cassandra 2.1.0 or higher. Versions of the Cassandra plugins that are compatible with Cassandra 1.2.x are maintained on the [cassandra-1.2](https://github.com/krasserm/akka-persistence-cassandra/tree/cassandra-1.2) branch.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Akka 2.4-RC1

@krasserm
Copy link
Owner

Great work, thanks a lot @chbatey. Most of my comments are only minor ones except the general comment on atomicity of deletes (which has already been an issue in all previous versions). Curious what you think ...

@chbatey
Copy link
Collaborator Author

chbatey commented Aug 31, 2015

Updated based on your comments @krasserm let me know if it is okay and I'll do some squashing.

@krasserm
Copy link
Owner

krasserm commented Sep 1, 2015

LGTM @chbatey

@chbatey
Copy link
Collaborator Author

chbatey commented Sep 1, 2015

Squashed into a commit for AtomicWrites and a commit for merging AtomicWrites for the the persistence id.

@krasserm
Copy link
Owner

krasserm commented Sep 1, 2015

Do you plan further updates on this PR or is it ready to merge (tracking further work by separate tickets)?

@chbatey
Copy link
Collaborator Author

chbatey commented Sep 1, 2015

Ready for merge, will raise a new issue to look at deletes that i'll do before 2.4 release. When are you planning on making master 2.4?

@krasserm
Copy link
Owner

krasserm commented Sep 1, 2015

We should merge to master when tickets in columns Ready and InProgress are done (except #64, #77 and #79). WDYT?

krasserm added a commit that referenced this pull request Sep 1, 2015
@krasserm krasserm merged commit da27b17 into krasserm:wip-akka-2.4 Sep 1, 2015
@krasserm
Copy link
Owner

krasserm commented Sep 1, 2015

Can you please close all tickets that are covered by this PR?

@krasserm krasserm modified the milestone: 0.4 Sep 1, 2015
@chbatey
Copy link
Collaborator Author

chbatey commented Sep 1, 2015

Yep - i think #70 and #76 could be optional as they don't require schema changes.

@krasserm
Copy link
Owner

krasserm commented Sep 1, 2015

+1

rafalsiwiec pushed a commit to rafalsiwiec/akka-persistence-cassandra that referenced this pull request Mar 23, 2016
…triknw

make C* test server more lightweight
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants