Skip to content

Commit

Permalink
Saves offset for unhandled events (#915)
Browse files Browse the repository at this point in the history
* saves offset for unhandled events
(resolves #904)

* adds logging back for unhandled events

* saves offset for unhandled events for JDBC, JPA and Slick

* slightly improved JPA version

* just formatting

* Extended tests for storing offsets on unhandled events

* bring similater code together - count and offset check next to each other

* typo

* formatting

* uses `batch.addAll` api + extracting `Pair`

* removes offset type parameter

* controller shutdowns of read side after each test

* Using duration's .seconds and .millis instead of ScalaTest Span

* using Batch API in all cases

* checking for empty statements
  • Loading branch information
Renato Cavalcanti authored and gmethvin committed Aug 14, 2017
1 parent 7e47b84 commit 443243e
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 172 deletions.
Expand Up @@ -3,6 +3,7 @@
*/
package com.lightbend.lagom.internal.javadsl.persistence.cassandra

import java.util
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.{ UUID, List => JList }
Expand All @@ -28,44 +29,59 @@ import scala.concurrent.{ ExecutionContext, Future }
* Internal API
*/
private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEvent[Event], Handler](
session: CassandraSession, handlers: Map[Class[_ <: Event], Handler], dispatcher: String
session: CassandraSession,
handlers: Map[Class[_ <: Event], Handler],
dispatcher: String
)(implicit ec: ExecutionContext) extends ReadSideHandler[Event] {

private val log = LoggerFactory.getLogger(this.getClass)

protected def invoke(handler: Handler, event: Event, offset: Offset): CompletionStage[JList[BoundStatement]]

override def handle(): Flow[Pair[Event, Offset], Done, _] = {
akka.stream.scaladsl.Flow[Pair[Event, Offset]].mapAsync(parallelism = 1) { pair =>
handlers.get(pair.first.getClass.asInstanceOf[Class[Event]]) match {
case Some(handler) =>
for {
statements <- invoke(handler, pair.first, pair.second).toScala
done <- statements.size match {
case 0 => Future.successful(Done.getInstance())
case 1 => session.executeWrite(statements.get(0)).toScala
case _ =>
val batch = new BatchStatement
val iter = statements.iterator()
while (iter.hasNext)
batch.add(iter.next)
session.executeWriteBatch(batch).toScala
}
} yield done
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", pair.first.getClass.getName)
Future.successful(Done.getInstance())

def executeStatements(statements: JList[BoundStatement]): Future[Done] = {
if (statements.isEmpty) {
Future.successful(Done.getInstance())
} else {
val batch = new BatchStatement
batch.addAll(statements)
session.executeWriteBatch(batch).toScala
}
}.withAttributes(ActorAttributes.dispatcher(dispatcher)).asJava
}

akka.stream.scaladsl.Flow[Pair[Event, Offset]]
.mapAsync(parallelism = 1) { pair =>

val Pair(event, offset) = pair
val eventClass = event.getClass

val handler =
handlers.getOrElse(
// lookup handler
eventClass,
// fallback to empty handler if none
{
if (log.isDebugEnabled()) log.debug("Unhandled event [{}]", eventClass.getName)
CassandraAutoReadSideHandler.emptyHandler[Event, Event].asInstanceOf[Handler]
}
)

invoke(handler, event, offset).toScala.flatMap(executeStatements)

}.withAttributes(ActorAttributes.dispatcher(dispatcher)).asJava
}
}

/**
* Internal API
*/
private[cassandra] object CassandraAutoReadSideHandler {

type Handler[Event] = (_ <: Event, Offset) => CompletionStage[JList[BoundStatement]]

def emptyHandler[Event, E <: Event]: Handler[Event] =
(_: E, _: Offset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava
}

/**
Expand All @@ -92,10 +108,13 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
override protected def invoke(handler: Handler[Event], event: Event, offset: Offset): CompletionStage[JList[BoundStatement]] = {
val boundStatements = {
for {
statements <- (handler.asInstanceOf[(Event, Offset) => CompletionStage[JList[BoundStatement]]].apply(event, offset).toScala)
statements <- handler.asInstanceOf[(Event, Offset) => CompletionStage[JList[BoundStatement]]].apply(event, offset).toScala
} yield {
val akkaOffset = OffsetAdapter.dslOffsetToOffset(offset)
TreePVector.from(statements).plus(offsetDao.bindSaveOffset(akkaOffset)).asInstanceOf[JList[BoundStatement]]
TreePVector
.from(statements)
.plus(offsetDao.bindSaveOffset(akkaOffset))
.asInstanceOf[JList[BoundStatement]]
}
}

Expand All @@ -115,6 +134,7 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
OffsetAdapter.offsetToDslOffset(dao.loadedOffset)
}).toJava
}

}

/**
Expand Down
Expand Up @@ -9,6 +9,7 @@ import com.google.inject.Guice
import com.lightbend.lagom.internal.javadsl.persistence.cassandra.{ CassandraPersistentEntityRegistry, CassandraReadSideImpl, JavadslCassandraOffsetStore }
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cassandra.CassandraReadSideSettings
import com.lightbend.lagom.javadsl.persistence.Offset.TimeBasedUUID
import com.lightbend.lagom.javadsl.persistence._
import com.typesafe.config.ConfigFactory

Expand Down
Expand Up @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory

import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.collection.JavaConverters._

/**
* Internal API
Expand All @@ -32,39 +33,45 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] = {

def invokeHandler(handler: Handler, elem: EventStreamElement[Event]): Future[Done] = {
for {
statements <- invoke(handler, elem)
done <- statements.size match {
case 0 => Future.successful(Done)
case 1 => session.executeWrite(statements.head)
case _ =>
val batch = new BatchStatement
val iter = statements.iterator
while (iter.hasNext)
batch.add(iter.next)
session.executeWriteBatch(batch)
}
} yield done
def executeStatements(statements: Seq[BoundStatement]): Future[Done] = {
val batch = new BatchStatement
// statements is never empty, there is at least the store offset statement
// for simplicity we just use batch api (even if there is only one)
batch.addAll(statements.asJava)
session.executeWriteBatch(batch)
}

Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { elem =>
handlers.get(elem.event.getClass.asInstanceOf[Class[Event]]) match {
case Some(handler) => invokeHandler(handler, elem)
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", elem.event.getClass.getName)
Future.successful(Done)
}
}.withAttributes(ActorAttributes.dispatcher(dispatcher))
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { elem =>

val eventClass = elem.event.getClass

val handler =
handlers.getOrElse(
// lookup handler
eventClass,
// fallback to empty handler if none
{
if (log.isDebugEnabled()) log.debug("Unhandled event [{}]", eventClass.getName)
CassandraAutoReadSideHandler.emptyHandler.asInstanceOf[Handler]
}
)

invoke(handler, elem).flatMap(executeStatements)

}.withAttributes(ActorAttributes.dispatcher(dispatcher))
}
}

/**
* Internal API
*/
private[cassandra] object CassandraAutoReadSideHandler {

type Handler[Event] = (EventStreamElement[_ <: Event]) => Future[immutable.Seq[BoundStatement]]

def emptyHandler[Event]: Handler[Event] =
(_) => Future.successful(immutable.Seq.empty[BoundStatement])
}

/**
Expand Down Expand Up @@ -96,6 +103,9 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
} yield statements :+ offsetDao.bindSaveOffset(element.offset)
}

protected def offsetStatement(offset: Offset): immutable.Seq[BoundStatement] =
immutable.Seq(offsetDao.bindSaveOffset(offset))

override def globalPrepare(): Future[Done] = {
globalPrepareCallback.apply()
}
Expand Down
Expand Up @@ -3,9 +3,10 @@
*/
package com.lightbend.lagom.scaladsl.persistence.cassandra

import akka.persistence.query.TimeBasedUUID

import scala.concurrent.Future
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cassandra.CassandraReadSideSettings
Expand Down
Expand Up @@ -95,25 +95,30 @@ private[lagom] class JdbcReadSideImpl @Inject() (slick: SlickProvider, offsetSto
}

override def handle(): Flow[Pair[Event, Offset], Done, Any] = {
akka.stream.scaladsl.Flow[Pair[Event, Offset]].mapAsync(parallelism = 1) { pair =>
eventHandlers.get(pair.first.getClass) match {
case Some(handler) =>
slick.db.run {
(for {
_ <- SimpleDBIO { ctx =>
handler.asInstanceOf[(Connection, Event, Offset) => Unit](ctx.connection, pair.first, pair.second)
}
_ <- offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second))
} yield {
Done.getInstance()
}).transactionally

akka.stream.scaladsl.Flow[Pair[Event, Offset]]
.mapAsync(parallelism = 1) { pair =>

val dbAction = eventHandlers.get(pair.first.getClass)
.map { handler =>
// apply handler if found
val castedHandler = handler.asInstanceOf[(Connection, Event, Offset) => Unit]
SimpleDBIO { ctx => castedHandler(ctx.connection, pair.first, pair.second) }
}
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", pair.first.getClass.getName)
Future.successful(Done.getInstance())
}
}.asJava
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", pair.first.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second))
}
.map(_ => Done)

slick.db.run(dbAction.transactionally)

}.asJava
}
}
}
Expand Up @@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage

import com.google.inject.Guice
import com.lightbend.lagom.internal.javadsl.persistence.jdbc.JdbcPersistentEntityRegistry
import com.lightbend.lagom.javadsl.persistence.Offset.Sequence
import com.lightbend.lagom.javadsl.persistence.TestEntity.Evt
import com.lightbend.lagom.javadsl.persistence._

Expand Down
Expand Up @@ -84,25 +84,27 @@ private[lagom] class JdbcReadSideImpl(slick: SlickProvider, offsetStore: SlickOf
}

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
akka.stream.scaladsl.Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { element =>
eventHandlers.get(element.event.getClass) match {
case Some(handler) =>
slick.db.run {
(for {
_ <- SimpleDBIO { ctx =>
handler.asInstanceOf[(Connection, EventStreamElement[Event]) => Unit](ctx.connection, element)
}
_ <- offsetDao.updateOffsetQuery(element.offset)
} yield {
Done.getInstance()
}).transactionally
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>

val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
val castedHandler = handler.asInstanceOf[(Connection, EventStreamElement[Event]) => Unit]
SimpleDBIO { ctx => castedHandler(ctx.connection, element) }
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
offsetDao.updateOffsetQuery(element.offset)
}
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", element.event.getClass.getName)
Future.successful(Done.getInstance())
.map(_ => Done)

slick.db.run(dbAction.transactionally)

}
}

}
}
Expand Up @@ -79,22 +79,26 @@ private[lagom] class SlickReadSideImpl(slick: SlickProvider, offsetStore: SlickO
}

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
akka.stream.scaladsl.Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { element =>
eventHandlers.get(element.event.getClass) match {
case Some(handler) =>
slick.db.run {
(for {
_ <- handler(element)
_ <- offsetDao.updateOffsetQuery(element.offset)
} yield {
Done.getInstance()
}).transactionally
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>

val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
// apply handler if found
handler(element)
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(element.offset)
}
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", element.event.getClass.getName)
Future.successful(Done.getInstance())
.map(_ => Done)

slick.db.run(dbAction.transactionally)
}
}
}
}
Expand Up @@ -3,6 +3,7 @@
*/
package com.lightbend.lagom.scaladsl.persistence.jdbc

import akka.persistence.query.Sequence
import com.lightbend.lagom.internal.scaladsl.persistence.jdbc.JdbcPersistentEntityRegistry
import com.lightbend.lagom.scaladsl.persistence.TestEntity.Evt
import com.lightbend.lagom.scaladsl.persistence._
Expand Down
Expand Up @@ -3,6 +3,7 @@
*/
package com.lightbend.lagom.scaladsl.persistence.slick

import akka.persistence.query.Sequence
import com.lightbend.lagom.internal.scaladsl.persistence.jdbc.JdbcPersistentEntityRegistry
import com.lightbend.lagom.scaladsl.persistence.TestEntity.Evt
import com.lightbend.lagom.scaladsl.persistence._
Expand Down

0 comments on commit 443243e

Please sign in to comment.