New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Projections spi #2727
Changes from 7 commits
602ec7b
fb05f2c
2780e38
8ed60c4
01ab9f9
c64a7ee
ea71401
d7b18a9
2eb082d
ab927a9
45c906a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Copyright (C) Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package com.lightbend.lagom.internal.spi.projection | ||
|
||
import akka.actor.ActorSystem | ||
import akka.annotation.InternalStableApi | ||
import akka.persistence.query.EventEnvelope | ||
import akka.persistence.query.Offset | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
|
||
object ProjectionSpi { | ||
|
||
@InternalStableApi | ||
private[lagom] def startProcessing(system: ActorSystem, tagName: String, envelope: EventEnvelope): EventEnvelope = | ||
envelope | ||
|
||
@InternalStableApi | ||
private[lagom] def afterUserFlow(projectionName: String, offset: Offset): Offset = offset | ||
|
||
@InternalStableApi | ||
private[lagom] def completedProcessing(offset: Future[Offset], exCtx: ExecutionContext): Future[Offset] = offset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems slightly strange: is there value in capturing both the moment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I completely missed that. Thanks @raboof |
||
|
||
@InternalStableApi | ||
private[lagom] def failed(actorSystem: ActorSystem, projectionName: String, partitionName: String): Unit = () | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main change in the PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is invoked as soon as an event is read from the journal. See
This is invoked, if possible, immediately after completing the user code in the stream. This is sometimes not possible (depends on the projection implementation).
This is invoked when the offset has been persisted.
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
package com.lightbend.lagom.internal.javadsl.persistence | ||
|
||
import akka.annotation.InternalStableApi | ||
import akka.persistence.query.NoOffset | ||
import akka.persistence.query.Offset | ||
import akka.persistence.query.Sequence | ||
|
@@ -16,17 +17,21 @@ import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomJavaDslOffset } | |
* Converts between the Akka Persistence Query Offset type and the internal Lagom Java Offset type | ||
*/ | ||
object OffsetAdapter { | ||
|
||
@InternalStableApi | ||
def offsetToDslOffset(offset: Offset): LagomJavaDslOffset = offset match { | ||
case TimeBasedUUID(uuid) => LagomJavaDslOffset.timeBasedUUID(uuid) | ||
case Sequence(value) => LagomJavaDslOffset.sequence(value) | ||
case NoOffset => LagomJavaDslOffset.NONE | ||
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + offset.getClass.getName) | ||
case _ => throw new IllegalArgumentException("Unsupported offset type " + offset.getClass.getName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo |
||
} | ||
|
||
@InternalStableApi | ||
def dslOffsetToOffset(dslOffset: LagomJavaDslOffset): Offset = dslOffset match { | ||
case uuid: LagomJavaDslOffset.TimeBasedUUID => TimeBasedUUID(uuid.value()) | ||
case seq: LagomJavaDslOffset.Sequence => Sequence(seq.value()) | ||
case LagomJavaDslOffset.NONE => NoOffset | ||
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + dslOffset.getClass.getName) | ||
case _ => throw new IllegalArgumentException("Unsupported offset type " + dslOffset.getClass.getName) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,25 +19,38 @@ import akka.stream.scaladsl | |
import akka.util.Timeout | ||
import akka.Done | ||
import akka.NotUsed | ||
import akka.japi | ||
import akka.persistence.query.{ Offset => AkkaOffset } | ||
import akka.stream.FlowShape | ||
import akka.stream.javadsl | ||
import akka.stream.scaladsl.Flow | ||
import akka.stream.scaladsl.GraphDSL | ||
import akka.stream.scaladsl.Unzip | ||
import akka.stream.scaladsl.Zip | ||
import com.lightbend.lagom.internal.persistence.ReadSideConfig | ||
import com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTask | ||
import com.lightbend.lagom.javadsl.persistence._ | ||
import com.lightbend.lagom.internal.projection.ProjectionRegistryActor.WorkerCoordinates | ||
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi | ||
import com.lightbend.lagom.javadsl.persistence.AggregateEvent | ||
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag | ||
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor | ||
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ReadSideActor without the |
||
|
||
import scala.compat.java8.FutureConverters._ | ||
import scala.concurrent.Future | ||
|
||
private[lagom] object ReadSideActor { | ||
def props[Event <: AggregateEvent[Event]]( | ||
tagName: String, | ||
workerCoordinates: WorkerCoordinates, | ||
config: ReadSideConfig, | ||
clazz: Class[Event], | ||
globalPrepareTask: ClusterStartupTask, | ||
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[akka.japi.Pair[Event, Offset], NotUsed], | ||
eventStreamFactory: (AggregateEventTag[Event], LagomOffset) => Source[akka.japi.Pair[Event, LagomOffset], NotUsed], | ||
processor: () => ReadSideProcessor[Event] | ||
)(implicit mat: Materializer) = | ||
Props( | ||
new ReadSideActor[Event]( | ||
tagName, | ||
workerCoordinates, | ||
config, | ||
clazz, | ||
globalPrepareTask, | ||
|
@@ -53,11 +66,11 @@ private[lagom] object ReadSideActor { | |
* Read side actor | ||
*/ | ||
private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]]( | ||
tagName: String, | ||
workerCoordinates: WorkerCoordinates, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a pending improvement: having both the projection name and the tag name inside the actor. |
||
config: ReadSideConfig, | ||
clazz: Class[Event], | ||
globalPrepareTask: ClusterStartupTask, | ||
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[akka.japi.Pair[Event, Offset], NotUsed], | ||
eventStreamFactory: (AggregateEventTag[Event], LagomOffset) => Source[akka.japi.Pair[Event, LagomOffset], NotUsed], | ||
processorFactory: () => ReadSideProcessor[Event] | ||
)(implicit mat: Materializer) | ||
extends Actor | ||
|
@@ -68,6 +81,8 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]]( | |
|
||
private var shutdown: Option[KillSwitch] = None | ||
|
||
val tagName = workerCoordinates.tagName | ||
|
||
override def postStop: Unit = { | ||
shutdown.foreach(_.shutdown()) | ||
} | ||
|
@@ -93,15 +108,38 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]]( | |
config.randomBackoffFactor | ||
) { () => | ||
val handler: ReadSideProcessor.ReadSideHandler[Event] = processorFactory().buildHandler() | ||
val futureOffset: Future[Offset] = handler.prepare(tag).toScala | ||
val futureOffset: Future[LagomOffset] = handler.prepare(tag).toScala | ||
|
||
scaladsl.Source | ||
.fromFuture(futureOffset) | ||
.future(futureOffset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was already implemented in #2614 but that PR was wrongfully targeting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess not since it's exactly the same change. |
||
.initialTimeout(config.offsetTimeout) | ||
.flatMapConcat { offset => | ||
val eventStreamSource = eventStreamFactory(tag, offset).asScala | ||
val usersFlow = handler.handle() | ||
eventStreamSource.via(usersFlow) | ||
val eventStreamSource: scaladsl.Source[japi.Pair[Event, LagomOffset], NotUsed] = | ||
eventStreamFactory(tag, offset).asScala | ||
val userFlow: javadsl.Flow[japi.Pair[Event, LagomOffset], Done, _] = | ||
handler | ||
.handle() | ||
.asScala | ||
.watchTermination() { (_, right) => | ||
right.recoverWith { | ||
case _ => | ||
ProjectionSpi.failed( | ||
context.system, | ||
workerCoordinates.projectionName, | ||
workerCoordinates.tagName | ||
) | ||
right | ||
} | ||
} | ||
.asJava | ||
|
||
val wrappedFlow = Flow[japi.Pair[Event, LagomOffset]] | ||
.map { pair => | ||
(pair, OffsetAdapter.dslOffsetToOffset(pair.second)) | ||
} | ||
.via(userFlowWrapper(workerCoordinates, userFlow.asScala)) | ||
|
||
eventStreamSource.via(wrappedFlow) | ||
} | ||
} | ||
|
||
|
@@ -122,4 +160,29 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]]( | |
// This actor will be restarted by WorkerCoordinator | ||
throw cause | ||
} | ||
|
||
private def userFlowWrapper( | ||
workerCoordinates: WorkerCoordinates, | ||
userFlow: Flow[japi.Pair[Event, LagomOffset], Done, _] | ||
): Flow[(japi.Pair[Event, LagomOffset], AkkaOffset), AkkaOffset, _] = | ||
Flow.fromGraph(GraphDSL.create(userFlow) { implicit builder => wrappedFlow => | ||
import GraphDSL.Implicits._ | ||
val unzip = builder.add(Unzip[japi.Pair[Event, LagomOffset], AkkaOffset]) | ||
val zip = builder.add(Zip[Done, AkkaOffset]) | ||
val metricsReporter: FlowShape[(Done, AkkaOffset), AkkaOffset] = builder.add(Flow.fromFunction { | ||
e: (Done, AkkaOffset) => | ||
ignasi35 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// TODO: in ReadSide processor we can't report `afterUserFlow` and `completedProcessing` separately | ||
// as we do in TopicProducerActor, unless we moved the invocation of `afterUserFlow` to each | ||
// particular ReadSideHandler (C* and JDBC). | ||
ProjectionSpi.afterUserFlow(workerCoordinates.projectionName, e._2) | ||
ProjectionSpi.completedProcessing(Future(e._2), context.dispatcher) | ||
e._2 | ||
}) | ||
|
||
unzip.out0 ~> wrappedFlow ~> zip.in0 | ||
unzip.out1 ~> zip.in1 | ||
zip.out ~> metricsReporter.in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main change in ReadSide processors. The user-provided flow (which is user code inside a |
||
FlowShape(unzip.in, metricsReporter.out) | ||
}) | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was getting a bit tired of so many
Offset
types in the code and having to scroll to the imports every time to see whatOffset
was the code referring to (Lagom's or Akka Persistence Query) so I started using aliases anywhere I would step into.This change makes the PR bigger than it should but hopefully also makes the PR more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really good. Thanks for doing that. It's making everything much easier to read and helps to review this PR.