Skip to content

Commit

Permalink
Adds side channel for ReadSideActor (javadsl). Other code cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
ignasi35 committed Mar 19, 2020
1 parent 2780e38 commit 8ed60c4
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 63 deletions.
Expand Up @@ -21,7 +21,7 @@ import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession
import com.lightbend.lagom.javadsl.persistence.AggregateEvent
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag
import com.lightbend.lagom.javadsl.persistence.Offset
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset }
import org.pcollections.TreePVector
import org.slf4j.LoggerFactory

Expand All @@ -40,9 +40,9 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve
extends ReadSideHandler[Event] {
private val log = LoggerFactory.getLogger(this.getClass)

protected def invoke(handler: Handler, event: Event, offset: Offset): CompletionStage[JList[BoundStatement]]
protected def invoke(handler: Handler, event: Event, offset: LagomOffset): CompletionStage[JList[BoundStatement]]

override def handle(): Flow[Pair[Event, Offset], Done, _] = {
override def handle(): Flow[Pair[Event, LagomOffset], Done, _] = {
def executeStatements(statements: JList[BoundStatement]): Future[Done] = {
if (statements.isEmpty) {
Future.successful(Done.getInstance())
Expand All @@ -54,7 +54,7 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve
}

akka.stream.scaladsl
.Flow[Pair[Event, Offset]]
.Flow[Pair[Event, LagomOffset]]
.mapAsync(parallelism = 1) { pair =>
val Pair(event, offset) = pair
val eventClass = event.getClass
Expand All @@ -81,10 +81,10 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve
* Internal API
*/
private[cassandra] object CassandraAutoReadSideHandler {
type Handler[Event] = (_ <: Event, Offset) => CompletionStage[JList[BoundStatement]]
type Handler[Event] = (_ <: Event, LagomOffset) => CompletionStage[JList[BoundStatement]]

def emptyHandler[Event, E <: Event]: Handler[Event] =
(_: E, _: Offset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava
(_: E, _: LagomOffset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava
}

/**
Expand Down Expand Up @@ -112,12 +112,12 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
protected override def invoke(
handler: Handler[Event],
event: Event,
offset: Offset
offset: LagomOffset
): CompletionStage[JList[BoundStatement]] = {
val boundStatements = {
for {
statements <- handler
.asInstanceOf[(Event, Offset) => CompletionStage[JList[BoundStatement]]]
.asInstanceOf[(Event, LagomOffset) => CompletionStage[JList[BoundStatement]]]
.apply(event, offset)
.toScala
} yield {
Expand All @@ -136,7 +136,7 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
globalPrepareCallback.apply()
}

override def prepare(tag: AggregateEventTag[Event]): CompletionStage[Offset] = {
override def prepare(tag: AggregateEventTag[Event]): CompletionStage[LagomOffset] = {
(for {
_ <- prepareCallback.apply(tag).toScala
dao <- offsetStore.prepare(readProcessorId, tag.tag)
Expand Down
Expand Up @@ -23,6 +23,7 @@ import akka.persistence.query.{ Offset => AkkaOffset }
import akka.stream.javadsl
import akka.Done
import akka.NotUsed
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi
import com.lightbend.lagom.javadsl.persistence._
import play.api.inject.Injector

Expand Down Expand Up @@ -153,6 +154,7 @@ class AbstractPersistentEntityRegistry(

queries
.eventsByTag(tag, startingOffset)
.map(envelope => ProjectionSpi.startProcessing(system, tag, envelope))
.map { env =>
Pair.create(env.event.asInstanceOf[Event], OffsetAdapter.offsetToDslOffset(env.offset))
}
Expand Down
Expand Up @@ -4,6 +4,7 @@

package com.lightbend.lagom.internal.javadsl.persistence

import akka.annotation.InternalStableApi
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.Sequence
Expand All @@ -16,17 +17,21 @@ import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomJavaDslOffset }
* Converts between the Akka Persistence Query Offset type and the internal Lagom Java Offset type
*/
object OffsetAdapter {

@InternalStableApi
def offsetToDslOffset(offset: Offset): LagomJavaDslOffset = offset match {
case TimeBasedUUID(uuid) => LagomJavaDslOffset.timeBasedUUID(uuid)
case Sequence(value) => LagomJavaDslOffset.sequence(value)
case NoOffset => LagomJavaDslOffset.NONE
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + offset.getClass.getName)
case _ => throw new IllegalArgumentException("Unsupported offset type " + offset.getClass.getName)
}

@InternalStableApi
def dslOffsetToOffset(dslOffset: LagomJavaDslOffset): Offset = dslOffset match {
case uuid: LagomJavaDslOffset.TimeBasedUUID => TimeBasedUUID(uuid.value())
case seq: LagomJavaDslOffset.Sequence => Sequence(seq.value())
case LagomJavaDslOffset.NONE => NoOffset
case _ => throw new IllegalArgumentException("Unsuppoerted offset type " + dslOffset.getClass.getName)
case _ => throw new IllegalArgumentException("Unsupported offset type " + dslOffset.getClass.getName)
}

}
Expand Up @@ -19,25 +19,38 @@ import akka.stream.scaladsl
import akka.util.Timeout
import akka.Done
import akka.NotUsed
import akka.japi
import akka.persistence.query.{ Offset => AkkaOffset }
import akka.stream.FlowShape
import akka.stream.javadsl
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.GraphDSL
import akka.stream.scaladsl.Unzip
import akka.stream.scaladsl.Zip
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTask
import com.lightbend.lagom.javadsl.persistence._
import com.lightbend.lagom.internal.projection.ProjectionRegistryActor.WorkerCoordinates
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi
import com.lightbend.lagom.javadsl.persistence.AggregateEvent
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor
import com.lightbend.lagom.javadsl.persistence.{ Offset => LagomOffset }

import scala.compat.java8.FutureConverters._
import scala.concurrent.Future

private[lagom] object ReadSideActor {
def props[Event <: AggregateEvent[Event]](
tagName: String,
workerCoordinates: WorkerCoordinates,
config: ReadSideConfig,
clazz: Class[Event],
globalPrepareTask: ClusterStartupTask,
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[akka.japi.Pair[Event, Offset], NotUsed],
eventStreamFactory: (AggregateEventTag[Event], LagomOffset) => Source[akka.japi.Pair[Event, LagomOffset], NotUsed],
processor: () => ReadSideProcessor[Event]
)(implicit mat: Materializer) =
Props(
new ReadSideActor[Event](
tagName,
workerCoordinates,
config,
clazz,
globalPrepareTask,
Expand All @@ -53,11 +66,11 @@ private[lagom] object ReadSideActor {
* Read side actor
*/
private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
tagName: String,
workerCoordinates: WorkerCoordinates,
config: ReadSideConfig,
clazz: Class[Event],
globalPrepareTask: ClusterStartupTask,
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[akka.japi.Pair[Event, Offset], NotUsed],
eventStreamFactory: (AggregateEventTag[Event], LagomOffset) => Source[akka.japi.Pair[Event, LagomOffset], NotUsed],
processorFactory: () => ReadSideProcessor[Event]
)(implicit mat: Materializer)
extends Actor
Expand All @@ -68,6 +81,8 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](

private var shutdown: Option[KillSwitch] = None

val tagName = workerCoordinates.tagName

override def postStop: Unit = {
shutdown.foreach(_.shutdown())
}
Expand All @@ -93,15 +108,24 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
config.randomBackoffFactor
) { () =>
val handler: ReadSideProcessor.ReadSideHandler[Event] = processorFactory().buildHandler()
val futureOffset: Future[Offset] = handler.prepare(tag).toScala
val futureOffset: Future[LagomOffset] = handler.prepare(tag).toScala

scaladsl.Source
.fromFuture(futureOffset)
.future(futureOffset)
.initialTimeout(config.offsetTimeout)
.flatMapConcat { offset =>
val eventStreamSource = eventStreamFactory(tag, offset).asScala
val usersFlow = handler.handle()
eventStreamSource.via(usersFlow)
val eventStreamSource: scaladsl.Source[japi.Pair[Event, LagomOffset], NotUsed] =
eventStreamFactory(tag, offset).asScala
val userFlow: javadsl.Flow[japi.Pair[Event, LagomOffset], Done, _] =
handler
.handle()
val wrappedFlow = Flow[japi.Pair[Event, LagomOffset]]
.map { pair =>
(pair, OffsetAdapter.dslOffsetToOffset(pair.second))
}
.via(userFlowWrapper(workerCoordinates, userFlow.asScala))

eventStreamSource.via(wrappedFlow)
}
}

Expand All @@ -122,4 +146,29 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
// This actor will be restarted by WorkerCoordinator
throw cause
}

private def userFlowWrapper(
workerCoordinates: WorkerCoordinates,
userFlow: Flow[japi.Pair[Event, LagomOffset], Done, _]
): Flow[(japi.Pair[Event, LagomOffset], AkkaOffset), AkkaOffset, _] =
Flow.fromGraph(GraphDSL.create(userFlow) { implicit builder => wrappedFlow =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[japi.Pair[Event, LagomOffset], AkkaOffset])
val zip = builder.add(Zip[Done, AkkaOffset])
val metricsReporter: FlowShape[(Done, AkkaOffset), AkkaOffset] = builder.add(Flow.fromFunction {
e: (Done, AkkaOffset) =>
// TODO: in ReadSide processor we can't report `afterUserFlow` and `completedProcessing` separately
// as we do in TopicProducerActor, unless we moved the invocation of `afterUserFlow` to each
// particular ReadSideHandler (C* and JDBC).
ProjectionSpi.afterUserFlow(workerCoordinates.projectionName, e._2)
ProjectionSpi.completedProcessing(Future(e._2), context.dispatcher)
e._2
})

unzip.out0 ~> wrappedFlow ~> zip.in0
unzip.out1 ~> zip.in1
zip.out ~> metricsReporter.in
FlowShape(unzip.in, metricsReporter.out)
})

}
Expand Up @@ -97,7 +97,7 @@ private[lagom] class ReadSideImpl @Inject() (

val readSidePropsFactory = (coordinates: WorkerCoordinates) =>
ReadSideActor.props(
coordinates.tagName,
coordinates,
config,
eventClass,
globalPrepareTask,
Expand Down
Expand Up @@ -4,20 +4,15 @@

package com.lightbend.lagom.javadsl.persistence

import java.util.concurrent.CompletionStage
import java.util.Optional
import java.util.UUID

import akka.NotUsed
import akka.japi.Pair
import akka.japi.function.Creator
import akka.stream.javadsl
import akka.Done
import akka.NotUsed
import com.lightbend.lagom.javadsl.persistence.Offset.Sequence
import com.lightbend.lagom.javadsl.persistence.Offset.TimeBasedUUID

import scala.concurrent.duration._

/**
* At system startup all [[PersistentEntity]] classes must be registered here
* with [[PersistentEntityRegistry#register]].
Expand Down
Expand Up @@ -4,32 +4,31 @@

package com.lightbend.lagom.internal.scaladsl.persistence

import akka.Done
import akka.NotUsed
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.Status
import akka.persistence.query.Offset
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.persistence.query.{ Offset => AkkaOffset }
import akka.stream.FlowShape
import akka.stream.KillSwitch
import akka.stream.KillSwitches
import akka.stream.Materializer
import akka.util.Timeout
import akka.Done
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.GraphDSL
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Unzip
import akka.stream.scaladsl.Zip
import akka.util.Timeout
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTask
import com.lightbend.lagom.internal.projection.ProjectionRegistryActor.WorkerCoordinates
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi
import com.lightbend.lagom.scaladsl.persistence._
import com.lightbend.lagom.spi.persistence.OffsetDao

import scala.concurrent.Future

Expand All @@ -39,7 +38,7 @@ private[lagom] object ReadSideActor {
config: ReadSideConfig,
clazz: Class[Event],
globalPrepareTask: ClusterStartupTask,
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[EventStreamElement[Event], NotUsed],
eventStreamFactory: (AggregateEventTag[Event], AkkaOffset) => Source[EventStreamElement[Event], NotUsed],
processor: () => ReadSideProcessor[Event]
)(implicit mat: Materializer) =
Props(
Expand All @@ -65,7 +64,7 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
config: ReadSideConfig,
clazz: Class[Event],
globalPrepareTask: ClusterStartupTask,
eventStreamFactory: (AggregateEventTag[Event], Offset) => Source[EventStreamElement[Event], NotUsed],
eventStreamFactory: (AggregateEventTag[Event], AkkaOffset) => Source[EventStreamElement[Event], NotUsed],
processor: () => ReadSideProcessor[Event]
)(implicit mat: Materializer)
extends Actor
Expand Down Expand Up @@ -97,17 +96,17 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
def receive: Receive = {
case Start =>
val tag = new AggregateEventTag(clazz, tagName)
val backOffSource: Source[Offset, NotUsed] =
val backOffSource: Source[AkkaOffset, NotUsed] =
RestartSource.withBackoff(
config.minBackoff,
config.maxBackoff,
config.randomBackoffFactor
) { () =>
val handler: ReadSideProcessor.ReadSideHandler[Event] = processor().buildHandler()
val futureOffset: Future[Offset] = handler.prepare(tag)
val futureAkkaOffset: Future[AkkaOffset] = handler.prepare(tag)

Source
.future(futureOffset)
.future(futureAkkaOffset)
.initialTimeout(config.offsetTimeout)
.flatMapConcat { offset =>
val eventStreamSource: Source[EventStreamElement[Event], NotUsed] = eventStreamFactory(tag, offset)
Expand Down Expand Up @@ -143,13 +142,13 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
private def userFlowWrapper(
workerCoordinates: WorkerCoordinates,
userFlow: Flow[EventStreamElement[Event], Done, NotUsed]
): Flow[(EventStreamElement[Event], Offset), Offset, NotUsed] =
): Flow[(EventStreamElement[Event], AkkaOffset), AkkaOffset, NotUsed] =
Flow.fromGraph(GraphDSL.create(userFlow) { implicit builder => wrappedFlow =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[EventStreamElement[Event], Offset])
val zip = builder.add(Zip[Done, Offset])
val metricsReporter: FlowShape[(Done, Offset), Offset] = builder.add(Flow.fromFunction {
e: (Done, Offset) =>
val unzip = builder.add(Unzip[EventStreamElement[Event], AkkaOffset])
val zip = builder.add(Zip[Done, AkkaOffset])
val metricsReporter: FlowShape[(Done, AkkaOffset), AkkaOffset] = builder.add(Flow.fromFunction {
e: (Done, AkkaOffset) =>
// TODO: in ReadSide processor we can't report `afterUserFlow` and `completedProcessing` separately
// as we do in TopicProducerActor, unless we moved the invocation of `afterUserFlow` to each
// particular ReadSideImpl (C* and JDBC).
Expand Down
Expand Up @@ -7,7 +7,7 @@ package com.lightbend.lagom.internal.broker.kafka
import java.net.URI

import akka.actor.ActorSystem
import akka.persistence.query.Offset
import akka.persistence.query.{ Offset => AkkaOffset }
import akka.stream.Materializer
import akka.stream.scaladsl._
import com.lightbend.lagom.internal.projection.ProjectionRegistry
Expand All @@ -29,7 +29,7 @@ private[lagom] object Producer {
kafkaConfig: KafkaConfig,
locateService: String => Future[Seq[URI]],
topicId: String,
eventStreamFactory: (String, Offset) => Source[(Message, Offset), _],
eventStreamFactory: (String, AkkaOffset) => Source[(Message, AkkaOffset), _],
partitionKeyStrategy: Option[Message => String],
serializer: Serializer[Message],
offsetStore: OffsetStore,
Expand Down

0 comments on commit 8ed60c4

Please sign in to comment.