Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Projections spi #2727

Merged
merged 11 commits into from Apr 1, 2020
1 change: 0 additions & 1 deletion .jvmopts
@@ -1,5 +1,4 @@
-Xms2G
-Xmx2G
-Xss2M
-XX:MaxInlineLevel=18
-XX:MaxMetaspaceSize=1G
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -282,6 +282,7 @@ val mimaSettings: Seq[Setting[_]] = {
}.toSet,
mimaBinaryIssueFilters ++= Seq(
// Add mima filters here.
ProblemFilters.exclude[Problem]("com.lightbend.lagom.internal.*"),
)
)
}
Expand Down
Expand Up @@ -21,7 +21,7 @@ import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession
import com.lightbend.lagom.javadsl.persistence.AggregateEvent
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag
import com.lightbend.lagom.javadsl.persistence.Offset
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting a bit tired of so many Offset types in the code and having to scroll to the imports every time to see what Offset was the code referring to (Lagom's or Akka Persistence Query) so I started using aliases anywhere I would step into.

This change makes the PR bigger than it should but hopefully also makes the PR more readable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really good. Thanks for doing that. It's making everything much easier to read and helps to review this PR.

import org.pcollections.TreePVector
import org.slf4j.LoggerFactory

Expand All @@ -40,9 +40,9 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve
extends ReadSideHandler[Event] {
private val log = LoggerFactory.getLogger(this.getClass)

protected def invoke(handler: Handler, event: Event, offset: Offset): CompletionStage[JList[BoundStatement]]
protected def invoke(handler: Handler, event: Event, offset: LagomOffset): CompletionStage[JList[BoundStatement]]

override def handle(): Flow[Pair[Event, Offset], Done, _] = {
override def handle(): Flow[Pair[Event, LagomOffset], Done, _] = {
def executeStatements(statements: JList[BoundStatement]): Future[Done] = {
if (statements.isEmpty) {
Future.successful(Done.getInstance())
Expand All @@ -54,7 +54,7 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve
}

akka.stream.scaladsl
.Flow[Pair[Event, Offset]]
.Flow[Pair[Event, LagomOffset]]
.mapAsync(parallelism = 1) { pair =>
val Pair(event, offset) = pair
val eventClass = event.getClass
Expand All @@ -81,10 +81,10 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve
* Internal API
*/
private[cassandra] object CassandraAutoReadSideHandler {
type Handler[Event] = (_ <: Event, Offset) => CompletionStage[JList[BoundStatement]]
type Handler[Event] = (_ <: Event, LagomOffset) => CompletionStage[JList[BoundStatement]]

def emptyHandler[Event, E <: Event]: Handler[Event] =
(_: E, _: Offset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava
(_: E, _: LagomOffset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava
}

/**
Expand Down Expand Up @@ -112,12 +112,12 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
protected override def invoke(
handler: Handler[Event],
event: Event,
offset: Offset
offset: LagomOffset
): CompletionStage[JList[BoundStatement]] = {
val boundStatements = {
for {
statements <- handler
.asInstanceOf[(Event, Offset) => CompletionStage[JList[BoundStatement]]]
.asInstanceOf[(Event, LagomOffset) => CompletionStage[JList[BoundStatement]]]
.apply(event, offset)
.toScala
} yield {
Expand All @@ -136,7 +136,7 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
globalPrepareCallback.apply()
}

override def prepare(tag: AggregateEventTag[Event]): CompletionStage[Offset] = {
override def prepare(tag: AggregateEventTag[Event]): CompletionStage[LagomOffset] = {
(for {
_ <- prepareCallback.apply(tag).toScala
dao <- offsetStore.prepare(readProcessorId, tag.tag)
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright (C) Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.lagom.internal.spi.projection

import akka.actor.ActorSystem
import akka.annotation.InternalStableApi
import akka.persistence.query.EventEnvelope
import akka.persistence.query.Offset

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

object ProjectionSpi {

@InternalStableApi
private[lagom] def startProcessing(system: ActorSystem, tagName: String, envelope: EventEnvelope): EventEnvelope =
envelope

@InternalStableApi
private[lagom] def afterUserFlow(projectionName: String, tagName: String, offset: Offset): Offset = offset

@InternalStableApi
private[lagom] def completedProcessing(
projectionName: String,
tagName: String,
offset: Offset
): Offset = offset

@InternalStableApi
private[lagom] def failed(
actorSystem: ActorSystem,
projectionName: String,
partitionName: String,
exception: Throwable
): Unit = ()

}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  private[lagom] def startProcessing(system: ActorSystem, tagName: String, envelope: EventEnvelope): 

This is invoked as soon as an event is read from the journal. See AbstractPersistentnEntityRegistry.eventStream

  private[lagom] def afterUserFlow(projectionName: String, offset: Offset): Offset = offset

This is invoked, if possible, immediately after completing the user code in the stream. This is sometimes not possible (depends on the projection implementation).

  private[lagom] def completedProcessing(offset: Future[Offset], exCtx: ExecutionContext): Future[Offset] 

This is invoked when the offset has been persisted.

  private[lagom] def failed(actorSystem: ActorSystem, projectionName: String, partitionName: String): ```

This is invoked is a failure traverses the stream on the porjection.

Expand Up @@ -23,6 +23,7 @@ import akka.persistence.query.{ Offset => AkkaOffset }
import akka.stream.javadsl
import akka.Done
import akka.NotUsed
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi
import com.lightbend.lagom.javadsl.persistence._
import play.api.inject.Injector

Expand Down Expand Up @@ -153,6 +154,7 @@ class AbstractPersistentEntityRegistry(

queries
.eventsByTag(tag, startingOffset)
.map(envelope => ProjectionSpi.startProcessing(system, tag, envelope))
.map { env =>
Pair.create(env.event.asInstanceOf[Event], OffsetAdapter.offsetToDslOffset(env.offset))
}
Expand Down
Expand Up @@ -4,6 +4,7 @@

package com.lightbend.lagom.internal.javadsl.persistence

import akka.annotation.InternalStableApi
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.Sequence
Expand All @@ -16,17 +17,21 @@ import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomJavaDslOffset }
* Converts between the Akka Persistence Query Offset type and the internal Lagom Java Offset type
*/
object OffsetAdapter {

@InternalStableApi
def offsetToDslOffset(offset: Offset): LagomJavaDslOffset = offset match {
case TimeBasedUUID(uuid) => LagomJavaDslOffset.timeBasedUUID(uuid)
case Sequence(value) => LagomJavaDslOffset.sequence(value)
case NoOffset => LagomJavaDslOffset.NONE
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + offset.getClass.getName)
case _ => throw new IllegalArgumentException("Unsupported offset type " + offset.getClass.getName)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

}

@InternalStableApi
def dslOffsetToOffset(dslOffset: LagomJavaDslOffset): Offset = dslOffset match {
case uuid: LagomJavaDslOffset.TimeBasedUUID => TimeBasedUUID(uuid.value())
case seq: LagomJavaDslOffset.Sequence => Sequence(seq.value())
case LagomJavaDslOffset.NONE => NoOffset
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + dslOffset.getClass.getName)
case _ => throw new IllegalArgumentException("Unsupported offset type " + dslOffset.getClass.getName)
}

}
Expand Up @@ -19,25 +19,38 @@ import akka.stream.scaladsl
import akka.util.Timeout
import akka.Done
import akka.NotUsed
import akka.japi
import akka.persistence.query.{ Offset => AkkaOffset }
import akka.stream.FlowShape
import akka.stream.javadsl
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.GraphDSL
import akka.stream.scaladsl.Unzip
import akka.stream.scaladsl.Zip
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTask
import com.lightbend.lagom.javadsl.persistence._
import com.lightbend.lagom.internal.projection.ProjectionRegistryActor.WorkerCoordinates
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi
import com.lightbend.lagom.javadsl.persistence.AggregateEvent
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ReadSideActor without the Offset aliases was impossible to reason about. When I added the userFlowWrapper it was too confusing.


import scala.compat.java8.FutureConverters._
import scala.concurrent.Future

private[lagom] object ReadSideActor {
def props[Event <: AggregateEvent[Event]](
tagName: String,
workerCoordinates: WorkerCoordinates,
config: ReadSideConfig,
clazz: Class[Event],
globalPrepareTask: ClusterStartupTask,
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[akka.japi.Pair[Event, Offset], NotUsed],
eventStreamFactory: (AggregateEventTag[Event], LagomOffset) => Source[akka.japi.Pair[Event, LagomOffset], NotUsed],
processor: () => ReadSideProcessor[Event]
)(implicit mat: Materializer) =
Props(
new ReadSideActor[Event](
tagName,
workerCoordinates,
config,
clazz,
globalPrepareTask,
Expand All @@ -53,11 +66,11 @@ private[lagom] object ReadSideActor {
* Read side actor
*/
private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
tagName: String,
workerCoordinates: WorkerCoordinates,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a pending improvement: having both the projection name and the tag name inside the actor.
The need to report the projection name in the metrics finally twisted my arm into implementing it.

config: ReadSideConfig,
clazz: Class[Event],
globalPrepareTask: ClusterStartupTask,
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[akka.japi.Pair[Event, Offset], NotUsed],
eventStreamFactory: (AggregateEventTag[Event], LagomOffset) => Source[akka.japi.Pair[Event, LagomOffset], NotUsed],
processorFactory: () => ReadSideProcessor[Event]
)(implicit mat: Materializer)
extends Actor
Expand All @@ -68,6 +81,8 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](

private var shutdown: Option[KillSwitch] = None

val tagName = workerCoordinates.tagName

override def postStop: Unit = {
shutdown.foreach(_.shutdown())
}
Expand All @@ -93,15 +108,39 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
config.randomBackoffFactor
) { () =>
val handler: ReadSideProcessor.ReadSideHandler[Event] = processorFactory().buildHandler()
val futureOffset: Future[Offset] = handler.prepare(tag).toScala
val futureOffset: Future[LagomOffset] = handler.prepare(tag).toScala

scaladsl.Source
.fromFuture(futureOffset)
.future(futureOffset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already implemented in #2614 but that PR was wrongfully targeting master. When forward merging 1.6.x into master this will CONFLICT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not since it's exactly the same change.

.initialTimeout(config.offsetTimeout)
.flatMapConcat { offset =>
val eventStreamSource = eventStreamFactory(tag, offset).asScala
val usersFlow = handler.handle()
eventStreamSource.via(usersFlow)
val eventStreamSource: scaladsl.Source[japi.Pair[Event, LagomOffset], NotUsed] =
eventStreamFactory(tag, offset).asScala
val userFlow: javadsl.Flow[japi.Pair[Event, LagomOffset], Done, _] =
handler
.handle()
.asScala
.watchTermination() { (_, right) =>
right.recoverWith {
case t: Throwable =>
ProjectionSpi.failed(
context.system,
workerCoordinates.projectionName,
workerCoordinates.tagName,
t
)
right
}
}
.asJava

val wrappedFlow = Flow[japi.Pair[Event, LagomOffset]]
.map { pair =>
(pair, OffsetAdapter.dslOffsetToOffset(pair.second))
}
.via(userFlowWrapper(workerCoordinates, userFlow.asScala))

eventStreamSource.via(wrappedFlow)
}
}

Expand All @@ -122,4 +161,33 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
// This actor will be restarted by WorkerCoordinator
throw cause
}

private def userFlowWrapper(
workerCoordinates: WorkerCoordinates,
userFlow: Flow[japi.Pair[Event, LagomOffset], Done, _]
): Flow[(japi.Pair[Event, LagomOffset], AkkaOffset), AkkaOffset, _] =
Flow.fromGraph(GraphDSL.create(userFlow) { implicit builder => wrappedFlow =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[japi.Pair[Event, LagomOffset], AkkaOffset])
val zip = builder.add(Zip[Done, AkkaOffset])
val metricsReporter: FlowShape[(Done, AkkaOffset), AkkaOffset] = builder.add(Flow.fromFunction {
case (_, akkaOffset) =>
// TODO: in ReadSide processor we can't report `afterUserFlow` and `completedProcessing` separately
// as we do in TopicProducerActor, unless we moved the invocation of `afterUserFlow` to each
// particular ReadSideHandler (C* and JDBC).
ProjectionSpi.afterUserFlow(workerCoordinates.projectionName, workerCoordinates.tagName, akkaOffset)
ProjectionSpi.completedProcessing(
workerCoordinates.projectionName,
workerCoordinates.tagName,
akkaOffset
)
akkaOffset
})

unzip.out0 ~> wrappedFlow ~> zip.in0
unzip.out1 ~> zip.in1
zip.out ~> metricsReporter.in
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change in ReadSide processors. The user-provided flow (which is user code inside a ReadSideHandler so both user statements and OffsetStore statments) is now wrapped and a side-channel for the offset to propagate alongside the userflow exists. The side-channel zips back and produces an Offset for the metrics.

FlowShape(unzip.in, metricsReporter.out)
})

}
Expand Up @@ -97,7 +97,7 @@ private[lagom] class ReadSideImpl @Inject() (

val readSidePropsFactory = (coordinates: WorkerCoordinates) =>
ReadSideActor.props(
coordinates.tagName,
coordinates,
config,
eventClass,
globalPrepareTask,
Expand Down
Expand Up @@ -4,20 +4,15 @@

package com.lightbend.lagom.javadsl.persistence

import java.util.concurrent.CompletionStage
import java.util.Optional
import java.util.UUID

import akka.NotUsed
import akka.japi.Pair
import akka.japi.function.Creator
import akka.stream.javadsl
import akka.Done
import akka.NotUsed
import com.lightbend.lagom.javadsl.persistence.Offset.Sequence
import com.lightbend.lagom.javadsl.persistence.Offset.TimeBasedUUID

import scala.concurrent.duration._

/**
* At system startup all [[PersistentEntity]] classes must be registered here
* with [[PersistentEntityRegistry#register]].
Expand Down
Expand Up @@ -126,7 +126,7 @@ trait AbstractReadSideSpec extends ImplicitSender with ScalaFutures with Eventua

val processorProps = (coordinates: WorkerCoordinates) =>
ReadSideActor.props[TestEntity.Evt](
coordinates.tagName,
coordinates,
ReadSideConfig(),
classOf[TestEntity.Evt],
clusterStartup,
Expand Down
Expand Up @@ -4,28 +4,25 @@

package com.lightbend.lagom.internal.scaladsl.persistence

import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.actor.CoordinatedShutdown
import akka.cluster.Cluster
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion
import akka.event.Logging
import akka.pattern.ask
import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.stream.scaladsl
import akka.util.Timeout
import akka.Done
import akka.NotUsed
import akka.annotation.InternalStableApi
import akka.persistence.query.EventEnvelope
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi
import com.lightbend.lagom.scaladsl.persistence._
import scala.concurrent.Future

import scala.concurrent.duration._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -135,6 +132,7 @@ class AbstractPersistentEntityRegistry(system: ActorSystem) extends PersistentEn

queries
.eventsByTag(tag, fromOffset)
.map(envelope => ProjectionSpi.startProcessing(system, tag, envelope))
.map(env =>
new EventStreamElement[Event](
PersistentEntityActor.extractEntityId(env.persistenceId),
Expand Down