New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Saves offset for unhandled events #915
Changes from 6 commits
b96a759
9dc4e61
9eed3c9
6295d6e
2c32788
ec34e50
97cfaec
73f3d44
8398c9e
45f7746
b4287de
95d4fa4
ae2745a
891ea81
0c72003
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
*/ | ||
package com.lightbend.lagom.internal.javadsl.persistence.cassandra | ||
|
||
import java.util | ||
import java.util.concurrent.CompletionStage | ||
import java.util.function.BiFunction | ||
import java.util.{ UUID, List => JList } | ||
|
@@ -28,44 +29,58 @@ import scala.concurrent.{ ExecutionContext, Future } | |
* Internal API | ||
*/ | ||
private[cassandra] abstract class CassandraReadSideHandler[Event <: AggregateEvent[Event], Handler]( | ||
session: CassandraSession, handlers: Map[Class[_ <: Event], Handler], dispatcher: String | ||
session: CassandraSession, | ||
handlers: Map[Class[_ <: Event], Handler], | ||
dispatcher: String | ||
)(implicit ec: ExecutionContext) extends ReadSideHandler[Event] { | ||
|
||
private val log = LoggerFactory.getLogger(this.getClass) | ||
|
||
protected def invoke(handler: Handler, event: Event, offset: Offset): CompletionStage[JList[BoundStatement]] | ||
|
||
override def handle(): Flow[Pair[Event, Offset], Done, _] = { | ||
akka.stream.scaladsl.Flow[Pair[Event, Offset]].mapAsync(parallelism = 1) { pair => | ||
handlers.get(pair.first.getClass.asInstanceOf[Class[Event]]) match { | ||
case Some(handler) => | ||
for { | ||
statements <- invoke(handler, pair.first, pair.second).toScala | ||
done <- statements.size match { | ||
case 0 => Future.successful(Done.getInstance()) | ||
case 1 => session.executeWrite(statements.get(0)).toScala | ||
case _ => | ||
val batch = new BatchStatement | ||
val iter = statements.iterator() | ||
while (iter.hasNext) | ||
batch.add(iter.next) | ||
session.executeWriteBatch(batch).toScala | ||
} | ||
} yield done | ||
case None => | ||
if (log.isDebugEnabled) | ||
log.debug("Unhandled event [{}]", pair.first.getClass.getName) | ||
Future.successful(Done.getInstance()) | ||
|
||
def executeStatements(statements: JList[BoundStatement]): Future[Done] = | ||
statements.size match { | ||
case 0 => Future.successful(Done.getInstance()) | ||
case 1 => session.executeWrite(statements.get(0)).toScala | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could also argue that since statements are never empty, we could simply not check the size and always use
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM, we can add that reasoning as a comment and simplify the code. |
||
case _ => | ||
val batch = new BatchStatement | ||
val iter = statements.iterator() | ||
while (iter.hasNext) | ||
batch.add(iter.next) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't now it. Will change for sure |
||
session.executeWriteBatch(batch).toScala | ||
} | ||
}.withAttributes(ActorAttributes.dispatcher(dispatcher)).asJava | ||
|
||
akka.stream.scaladsl.Flow[Pair[Event, Offset]] | ||
.mapAsync(parallelism = 1) { pair => | ||
|
||
val handler = | ||
handlers.getOrElse( | ||
// lookup handler | ||
pair.first.getClass.asInstanceOf[Class[Event]], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the cast is not needed (IDK why it was there in the original code) It also might be nice to destructure the pair at the top of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, extracting is a great idea |
||
// fallback to empty handle if none | ||
{ | ||
if (log.isDebugEnabled()) log.debug("Unhandled event [{}]", pair.first.getClass.getName) | ||
CassandraAutoReadSideHandler.emptyHandler[Event, Event].asInstanceOf[Handler] | ||
} | ||
) | ||
|
||
invoke(handler, pair.first, pair.second).toScala.flatMap(executeStatements) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of this pattern matching, I would have prefer to work with a A
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Found a way to encode an |
||
}.withAttributes(ActorAttributes.dispatcher(dispatcher)).asJava | ||
} | ||
} | ||
|
||
/** | ||
* Internal API | ||
*/ | ||
private[cassandra] object CassandraAutoReadSideHandler { | ||
|
||
type Handler[Event] = (_ <: Event, Offset) => CompletionStage[JList[BoundStatement]] | ||
|
||
def emptyHandler[Event, E <: Event]: Handler[Event] = | ||
(_: E, _: Offset) => Future.successful(util.Collections.emptyList[BoundStatement]()).toJava | ||
} | ||
|
||
/** | ||
|
@@ -95,7 +110,10 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv | |
statements <- (handler.asInstanceOf[(Event, Offset) => CompletionStage[JList[BoundStatement]]].apply(event, offset).toScala) | ||
} yield { | ||
val akkaOffset = OffsetAdapter.dslOffsetToOffset(offset) | ||
TreePVector.from(statements).plus(offsetDao.bindSaveOffset(akkaOffset)).asInstanceOf[JList[BoundStatement]] | ||
TreePVector | ||
.from(statements) | ||
.plus(offsetDao.bindSaveOffset(akkaOffset)) | ||
.asInstanceOf[JList[BoundStatement]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. formatting win. |
||
} | ||
} | ||
|
||
|
@@ -115,6 +133,7 @@ private[cassandra] final class CassandraAutoReadSideHandler[Event <: AggregateEv | |
OffsetAdapter.offsetToDslOffset(dao.loadedOffset) | ||
}).toJava | ||
} | ||
|
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,25 +95,30 @@ private[lagom] class JdbcReadSideImpl @Inject() (slick: SlickProvider, offsetSto | |
} | ||
|
||
override def handle(): Flow[Pair[Event, Offset], Done, Any] = { | ||
akka.stream.scaladsl.Flow[Pair[Event, Offset]].mapAsync(parallelism = 1) { pair => | ||
eventHandlers.get(pair.first.getClass) match { | ||
case Some(handler) => | ||
slick.db.run { | ||
(for { | ||
_ <- SimpleDBIO { ctx => | ||
handler.asInstanceOf[(Connection, Event, Offset) => Unit](ctx.connection, pair.first, pair.second) | ||
} | ||
_ <- offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second)) | ||
} yield { | ||
Done.getInstance() | ||
}).transactionally | ||
|
||
akka.stream.scaladsl.Flow[Pair[Event, Offset]] | ||
.mapAsync(parallelism = 1) { pair => | ||
|
||
val dbAction = eventHandlers.get(pair.first.getClass) | ||
.map { handler => | ||
// apply handler if found | ||
val castedHandler = handler.asInstanceOf[(Connection, Event, Offset) => Unit] | ||
SimpleDBIO { ctx => castedHandler(ctx.connection, pair.first, pair.second) } | ||
} | ||
case None => | ||
if (log.isDebugEnabled) | ||
log.debug("Unhandled event [{}]", pair.first.getClass.getName) | ||
Future.successful(Done.getInstance()) | ||
} | ||
}.asJava | ||
.getOrElse { | ||
// fallback to empty action if no handler is found | ||
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", pair.first.getClass.getName) | ||
DBIO.successful(()) | ||
} | ||
.flatMap { _ => | ||
// whatever it happens we save the offset | ||
offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second)) | ||
} | ||
.map(_ => Done) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Live is so much easier with a good IO monad around. |
||
|
||
slick.db.run(dbAction.transactionally) | ||
|
||
}.asJava | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually
statements.size
is never 0 because when we fallback to an empty handler, we always add at least one statement (save offset) to the list.