Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saves offset for unhandled events #915

Merged
merged 15 commits into from Aug 11, 2017
Expand Up @@ -3,6 +3,7 @@
*/
package com.lightbend.lagom.internal.javadsl.persistence.cassandra

import java.util
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.{ UUID, List => JList }
Expand All @@ -28,44 +29,58 @@ import scala.concurrent.{ ExecutionContext, Future }
* Internal API
*/
private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEvent[Event], Handler](
session: CassandraSession, handlers: Map[Class[_ <: Event], Handler], dispatcher: String
session: CassandraSession,
handlers: Map[Class[_ <: Event], Handler],
dispatcher: String
)(implicit ec: ExecutionContext) extends ReadSideHandler[Event] {

private val log = LoggerFactory.getLogger(this.getClass)

protected def invoke(handler: Handler, event: Event, offset: Offset): CompletionStage[JList[BoundStatement]]

override def handle(): Flow[Pair[Event, Offset], Done, _] = {
akka.stream.scaladsl.Flow[Pair[Event, Offset]].mapAsync(parallelism = 1) { pair =>
handlers.get(pair.first.getClass.asInstanceOf[Class[Event]]) match {
case Some(handler) =>
for {
statements <- invoke(handler, pair.first, pair.second).toScala
done <- statements.size match {
case 0 => Future.successful(Done.getInstance())
case 1 => session.executeWrite(statements.get(0)).toScala
case _ =>
val batch = new BatchStatement
val iter = statements.iterator()
while (iter.hasNext)
batch.add(iter.next)
session.executeWriteBatch(batch).toScala
}
} yield done
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", pair.first.getClass.getName)
Future.successful(Done.getInstance())

def executeStatements(statements: JList[BoundStatement]): Future[Done] =
statements.size match {
case 0 => Future.successful(Done.getInstance())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually statements.size is never 0 because when we fallback to an empty handler, we always add at least one statement (save offset) to the list.

case 1 => session.executeWrite(statements.get(0)).toScala
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also argue that since statements are never empty, we could simply not check the size and always use BatchStatement. The code would be cleaner in that case with a small penalty of running a batch of one single statement.

def executeStatements(statements: JList[BoundStatement]): Future[Done] = {
  val batch = new BatchStatement
  batch.addAll(statements)
  session.executeWriteBatch(batch).toScala
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, we can add that reasoning as a comment and simplify the code.

case _ =>
val batch = new BatchStatement
val iter = statements.iterator()
while (iter.hasNext)
batch.add(iter.next)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchStatement has an addAll(Iterable) method... not sure why this wasn't using it in the first place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't now it. Will change for sure

session.executeWriteBatch(batch).toScala
}
}.withAttributes(ActorAttributes.dispatcher(dispatcher)).asJava

akka.stream.scaladsl.Flow[Pair[Event, Offset]]
.mapAsync(parallelism = 1) { pair =>

val handler =
handlers.getOrElse(
// lookup handler
pair.first.getClass.asInstanceOf[Class[Event]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cast is not needed (IDK why it was there in the original code)

It also might be nice to destructure the pair at the top of the mapAsync block... I find with pair.first, pair.second that I have to keep referring back to understand what each one is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, extracting is a great idea

// fallback to empty handle if none
{
if (log.isDebugEnabled()) log.debug("Unhandled event [{}]", pair.first.getClass.getName)
CassandraAutoReadSideHandler.emptyHandler[Event, Event].asInstanceOf[Handler]
}
)

invoke(handler, pair.first, pair.second).toScala.flatMap(executeStatements)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this pattern matching, I would have prefer to work with a NoOps handler emitting an empty list of BoundStatement, but I couldn't manage to build one. I'm get some compilation error (won't give the details here).

A NoOps handler is a much cleaner solution because it will increase local reasoning. The following would be much simple implementation.

val handler = 
  handlers.getOrElse(
    pair.first.getClass.asInstanceOf[Class[Event]]),
    NoOpsHandler
  )
  
   invoke(handler, pair.first, pair.second).toScala.flatMap(executeStatements)

The invoke method would always add the offset statement and we avoid the branching and excessive logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a way to encode an emptyHandler. Will push it soon.

}.withAttributes(ActorAttributes.dispatcher(dispatcher)).asJava
}
}

/**
* Internal API
*/
private[cassandra] object CassandraAutoReadSideHandler {

type Handler[Event] = (_ <: Event, Offset) => CompletionStage[JList[BoundStatement]]

def emptyHandler[Event, E <: Event]: Handler[Event] =
(_: E, _: Offset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava
}

/**
Expand Down Expand Up @@ -95,7 +110,10 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
statements <- (handler.asInstanceOf[(Event, Offset) => CompletionStage[JList[BoundStatement]]].apply(event, offset).toScala)
} yield {
val akkaOffset = OffsetAdapter.dslOffsetToOffset(offset)
TreePVector.from(statements).plus(offsetDao.bindSaveOffset(akkaOffset)).asInstanceOf[JList[BoundStatement]]
TreePVector
.from(statements)
.plus(offsetDao.bindSaveOffset(akkaOffset))
.asInstanceOf[JList[BoundStatement]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting win.

}
}

Expand All @@ -115,6 +133,7 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
OffsetAdapter.offsetToDslOffset(dao.loadedOffset)
}).toJava
}

}

/**
Expand Down
Expand Up @@ -9,6 +9,7 @@ import com.google.inject.Guice
import com.lightbend.lagom.internal.javadsl.persistence.cassandra.{ CassandraPersistentEntityRegistry, CassandraReadSideImpl, JavadslCassandraOffsetStore }
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cassandra.CassandraReadSideSettings
import com.lightbend.lagom.javadsl.persistence.Offset.TimeBasedUUID
import com.lightbend.lagom.javadsl.persistence._
import com.typesafe.config.ConfigFactory

Expand All @@ -20,7 +21,7 @@ object CassandraReadSideSpec {
val noAutoCreateConfig = ConfigFactory.parseString("lagom.persistence.read-side.cassandra.tables-autocreate = false")
}

class CassandraReadSideSpec extends CassandraPersistenceSpec(CassandraReadSideSpec.defaultConfig) with AbstractReadSideSpec {
class CassandraReadSideSpec extends CassandraPersistenceSpec(CassandraReadSideSpec.defaultConfig) with AbstractReadSideSpec[TimeBasedUUID] {
import system.dispatcher

private lazy val injector = Guice.createInjector()
Expand Down
Expand Up @@ -3,6 +3,8 @@
*/
package com.lightbend.lagom.internal.scaladsl.persistence.cassandra

import java.util

import akka.persistence.query.Offset
import akka.stream.ActorAttributes
import akka.stream.scaladsl.Flow
Expand Down Expand Up @@ -32,39 +34,47 @@ private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEve

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] = {

def invokeHandler(handler: Handler, elem: EventStreamElement[Event]): Future[Done] = {
for {
statements <- invoke(handler, elem)
done <- statements.size match {
case 0 => Future.successful(Done)
case 1 => session.executeWrite(statements.head)
case _ =>
val batch = new BatchStatement
val iter = statements.iterator
while (iter.hasNext)
batch.add(iter.next)
session.executeWriteBatch(batch)
}
} yield done
}

Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { elem =>
handlers.get(elem.event.getClass.asInstanceOf[Class[Event]]) match {
case Some(handler) => invokeHandler(handler, elem)
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", elem.event.getClass.getName)
Future.successful(Done)
def executeStatements(statements: Seq[BoundStatement]): Future[Done] =
statements.size match {
case 0 => Future.successful(Done)
case 1 => session.executeWrite(statements.head)
case _ =>
val batch = new BatchStatement
val iter = statements.iterator
while (iter.hasNext)
batch.add(iter.next)
session.executeWriteBatch(batch)
}
}.withAttributes(ActorAttributes.dispatcher(dispatcher))

Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { elem =>

val handler =
handlers.getOrElse(
// lookup handler
elem.event.getClass.asInstanceOf[Class[Event]],
// fallback to empty handle if none
{
if (log.isDebugEnabled()) log.debug("Unhandled event [{}]", elem.event.getClass.getName)
CassandraAutoReadSideHandler.emptyHandler.asInstanceOf[Handler]
}
)

invoke(handler, elem).flatMap(executeStatements)

}.withAttributes(ActorAttributes.dispatcher(dispatcher))
}
}

/**
* Internal API
*/
private[cassandra] object CassandraAutoReadSideHandler {

type Handler[Event] = (EventStreamElement[_ <: Event]) => Future[immutable.Seq[BoundStatement]]

def emptyHandler[Event]: Handler[Event] =
(_) => Future.successful(immutable.Seq.empty[BoundStatement])
}

/**
Expand Down Expand Up @@ -96,6 +106,9 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv
} yield statements :+ offsetDao.bindSaveOffset(element.offset)
}

protected def offsetStatement(offset: Offset): immutable.Seq[BoundStatement] =
immutable.Seq(offsetDao.bindSaveOffset(offset))

override def globalPrepare(): Future[Done] = {
globalPrepareCallback.apply()
}
Expand Down
Expand Up @@ -3,9 +3,10 @@
*/
package com.lightbend.lagom.scaladsl.persistence.cassandra

import akka.persistence.query.TimeBasedUUID

import scala.concurrent.Future
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
import com.lightbend.lagom.internal.persistence.ReadSideConfig
import com.lightbend.lagom.internal.persistence.cassandra.CassandraReadSideSettings
Expand All @@ -19,7 +20,7 @@ object CassandraReadSideSpec {
val noAutoCreateConfig = ConfigFactory.parseString("lagom.persistence.read-side.cassandra.tables-autocreate = false")
}

class CassandraReadSideSpec extends CassandraPersistenceSpec(CassandraReadSideSpec.defaultConfig, TestEntitySerializerRegistry) with AbstractReadSideSpec {
class CassandraReadSideSpec extends CassandraPersistenceSpec(CassandraReadSideSpec.defaultConfig, TestEntitySerializerRegistry) with AbstractReadSideSpec[TimeBasedUUID] {
import system.dispatcher

override protected lazy val persistentEntityRegistry = new CassandraPersistentEntityRegistry(system)
Expand Down
Expand Up @@ -95,25 +95,30 @@ private[lagom] class JdbcReadSideImpl @Inject() (slick: SlickProvider, offsetSto
}

override def handle(): Flow[Pair[Event, Offset], Done, Any] = {
akka.stream.scaladsl.Flow[Pair[Event, Offset]].mapAsync(parallelism = 1) { pair =>
eventHandlers.get(pair.first.getClass) match {
case Some(handler) =>
slick.db.run {
(for {
_ <- SimpleDBIO { ctx =>
handler.asInstanceOf[(Connection, Event, Offset) => Unit](ctx.connection, pair.first, pair.second)
}
_ <- offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second))
} yield {
Done.getInstance()
}).transactionally

akka.stream.scaladsl.Flow[Pair[Event, Offset]]
.mapAsync(parallelism = 1) { pair =>

val dbAction = eventHandlers.get(pair.first.getClass)
.map { handler =>
// apply handler if found
val castedHandler = handler.asInstanceOf[(Connection, Event, Offset) => Unit]
SimpleDBIO { ctx => castedHandler(ctx.connection, pair.first, pair.second) }
}
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", pair.first.getClass.getName)
Future.successful(Done.getInstance())
}
}.asJava
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", pair.first.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second))
}
.map(_ => Done)
Copy link
Member Author

@octonato octonato Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Live is so much easier with a good IO monad around.


slick.db.run(dbAction.transactionally)

}.asJava
}
}
}
Expand Up @@ -8,12 +8,13 @@ import java.util.concurrent.CompletionStage

import com.google.inject.Guice
import com.lightbend.lagom.internal.javadsl.persistence.jdbc.JdbcPersistentEntityRegistry
import com.lightbend.lagom.javadsl.persistence.Offset.Sequence
import com.lightbend.lagom.javadsl.persistence.TestEntity.Evt
import com.lightbend.lagom.javadsl.persistence._

import scala.concurrent.duration._

class JdbcReadSideSpec extends JdbcPersistenceSpec with AbstractReadSideSpec {
class JdbcReadSideSpec extends JdbcPersistenceSpec with AbstractReadSideSpec[Sequence] {
private lazy val injector = Guice.createInjector()
override protected lazy val persistentEntityRegistry = new JdbcPersistentEntityRegistry(system, injector, slick)

Expand Down
Expand Up @@ -84,25 +84,27 @@ private[lagom] class JdbcReadSideImpl(slick: SlickProvider, offsetStore: SlickOf
}

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
akka.stream.scaladsl.Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { element =>
eventHandlers.get(element.event.getClass) match {
case Some(handler) =>
slick.db.run {
(for {
_ <- SimpleDBIO { ctx =>
handler.asInstanceOf[(Connection, EventStreamElement[Event]) => Unit](ctx.connection, element)
}
_ <- offsetDao.updateOffsetQuery(element.offset)
} yield {
Done.getInstance()
}).transactionally
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>

val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
val castedHandler = handler.asInstanceOf[(Connection, EventStreamElement[Event]) => Unit]
SimpleDBIO { ctx => castedHandler(ctx.connection, element) }
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
offsetDao.updateOffsetQuery(element.offset)
}
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", element.event.getClass.getName)
Future.successful(Done.getInstance())
.map(_ => Done)

slick.db.run(dbAction.transactionally)

}
}

}
}
Expand Up @@ -79,22 +79,26 @@ private[lagom] class SlickReadSideImpl(slick: SlickProvider, offsetStore: SlickO
}

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
akka.stream.scaladsl.Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { element =>
eventHandlers.get(element.event.getClass) match {
case Some(handler) =>
slick.db.run {
(for {
_ <- handler(element)
_ <- offsetDao.updateOffsetQuery(element.offset)
} yield {
Done.getInstance()
}).transactionally
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>

val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
// apply handler if found
handler(element)
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(element.offset)
}
case None =>
if (log.isDebugEnabled)
log.debug("Unhandled event [{}]", element.event.getClass.getName)
Future.successful(Done.getInstance())
.map(_ => Done)

slick.db.run(dbAction.transactionally)
}
}
}
}