New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Projections spi #2727
Conversation
00b7388
to
9dc2be2
Compare
@@ -21,7 +21,7 @@ import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor.ReadSideHandler | |||
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession | |||
import com.lightbend.lagom.javadsl.persistence.AggregateEvent | |||
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag | |||
import com.lightbend.lagom.javadsl.persistence.Offset | |||
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was getting a bit tired of so many Offset
types in the code and having to scroll to the imports every time to see what Offset
was the code referring to (Lagom's or Akka Persistence Query) so I started using aliases anywhere I would step into.
This change makes the PR bigger than it should but hopefully also makes the PR more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really good. Thanks for doing that. It's making everything much easier to read and helps to review this PR.
@InternalStableApi | ||
private[lagom] def completedProcessing(offset: Future[Offset], exCtx: ExecutionContext): Future[Offset] = offset | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[lagom] def startProcessing(system: ActorSystem, tagName: String, envelope: EventEnvelope):
This is invoked as soon as an event is read from the journal. See AbstractPersistentnEntityRegistry.eventStream
private[lagom] def afterUserFlow(projectionName: String, offset: Offset): Offset = offset
This is invoked, if possible, immediately after completing the user code in the stream. This is sometimes not possible (depends on the projection implementation).
private[lagom] def completedProcessing(offset: Future[Offset], exCtx: ExecutionContext): Future[Offset]
This is invoked when the offset has been persisted.
private[lagom] def failed(actorSystem: ActorSystem, projectionName: String, partitionName: String): ```
This is invoked is a failure traverses the stream on the porjection.
def offsetToDslOffset(offset: Offset): LagomJavaDslOffset = offset match { | ||
case TimeBasedUUID(uuid) => LagomJavaDslOffset.timeBasedUUID(uuid) | ||
case Sequence(value) => LagomJavaDslOffset.sequence(value) | ||
case NoOffset => LagomJavaDslOffset.NONE | ||
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + offset.getClass.getName) | ||
case _ => throw new IllegalArgumentException("Unsupported offset type " + offset.getClass.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
import com.lightbend.lagom.javadsl.persistence.AggregateEvent | ||
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag | ||
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor | ||
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ReadSideActor without the Offset
aliases was impossible to reason about. When I added the userFlowWrapper
it was too confusing.
@@ -53,11 +66,11 @@ private[lagom] object ReadSideActor { | |||
* Read side actor | |||
*/ | |||
private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]]( | |||
tagName: String, | |||
workerCoordinates: WorkerCoordinates, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a pending improvement: having both the projection name and the tag name inside the actor.
The need to report the projection name in the metrics finally twisted my arm into implementing it.
|
||
scaladsl.Source | ||
.fromFuture(futureOffset) | ||
.future(futureOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already implemented in #2614 but that PR was wrongfully targeting master
. When forward merging 1.6.x
into master
this will CONFLICT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not since it's exactly the same change.
|
||
unzip.out0 ~> wrappedFlow ~> zip.in0 | ||
unzip.out1 ~> zip.in1 | ||
zip.out ~> metricsReporter.in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change in ReadSide processors. The user-provided flow (which is user code inside a ReadSideHandler
so both user statements and OffsetStore statments) is now wrapped and a side-channel for the offset to propagate alongside the userflow exists. The side-channel zips back and produces an Offset
for the metrics.
project/Dependencies.scala
Outdated
@@ -28,7 +28,7 @@ object Dependencies { | |||
val Akka: String = sys.props.getOrElse("lagom.build.akka.version", "2.6.3") // sync with docs/build.sbt | |||
val AkkaHttp = "10.1.11" | |||
|
|||
val AkkaPersistenceCassandra = "0.102" | |||
val AkkaPersistenceCassandra = "0.103-SNAPSHOT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already fixed in #2758
@@ -2,7 +2,7 @@ | |||
* Copyright (C) Lightbend Inc. <https://www.lightbend.com> | |||
*/ | |||
|
|||
package com.lightbend.lagom.internal.cluster.projections | |||
package com.lightbend.lagom.internal.projection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved class to the appropriate package. Standardized to using singular.
(tuple._1, o) | ||
} | ||
.via(eventPublisherFlow) | ||
.map(o => ProjectionSpi.completedProcessing(o, context.dispatcher)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of how TopicProducer
works, the changes in TopicProducerActor
differ from the changes in ReadSideActor
implementations above.
Here, we don't have a side channel to propagate the Offset
because the offset propagates already. But also because in a Topic
Flow
users may decide to filter
events in which case using a side-channel becomes useless.
9dc2be2
to
8ed60c4
Compare
private[lagom] def afterUserFlow(projectionName: String, offset: Offset): Offset = offset | ||
|
||
@InternalStableApi | ||
private[lagom] def completedProcessing(offset: Future[Offset], exCtx: ExecutionContext): Future[Offset] = offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems slightly strange: is there value in capturing both the moment completedProcessing
is called and the moment offset
resolves? Or is there something around context propagation that means context must be captured before offset
resolves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely missed that. Thanks @raboof
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
scaladsl.Source | ||
.fromFuture(futureOffset) | ||
.future(futureOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not since it's exactly the same change.
.../javadsl/src/main/scala/com/lightbend/lagom/internal/javadsl/persistence/ReadSideActor.scala
Outdated
Show resolved
Hide resolved
...fka/server/src/main/scala/com/lightbend/lagom/internal/broker/kafka/TopicProducerActor.scala
Outdated
Show resolved
Hide resolved
...fka/server/src/main/scala/com/lightbend/lagom/internal/broker/kafka/TopicProducerActor.scala
Outdated
Show resolved
Hide resolved
@@ -21,7 +21,7 @@ import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor.ReadSideHandler | |||
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession | |||
import com.lightbend.lagom.javadsl.persistence.AggregateEvent | |||
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag | |||
import com.lightbend.lagom.javadsl.persistence.Offset | |||
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really good. Thanks for doing that. It's making everything much easier to read and helps to review this PR.
No description provided.