Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saves offset for unhandled events #915

Merged
merged 15 commits into from Aug 11, 2017

Conversation

octonato
Copy link
Member

@octonato octonato commented Aug 4, 2017

Fixes #904

if (log.isDebugEnabled) log.debug("Unhandled event [{}]", pair.first.getClass.getName)
executeStatements(offsetStatement(pair.second))
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this pattern matching, I would have prefer to work with a NoOps handler emitting an empty list of BoundStatement, but I couldn't manage to build one. I'm get some compilation error (won't give the details here).

A NoOps handler is a much cleaner solution because it will increase local reasoning. The following would be much simple implementation.

val handler = 
  handlers.getOrElse(
    pair.first.getClass.asInstanceOf[Class[Event]]),
    NoOpsHandler
  )
  
   invoke(handler, pair.first, pair.second).toScala.flatMap(executeStatements)

The invoke method would always add the offset statement and we avoid the branching and excessive logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a way to encode an emptyHandler. Will push it soon.

@octonato octonato force-pushed the read-side-offset-persistence branch from 9eb467c to b96a759 Compare August 4, 2017 13:27
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(OffsetAdapter.dslOffsetToOffset(pair.second))
}
.map(_ => Done)
Copy link
Member Author

@octonato octonato Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Live is so much easier with a good IO monad around.

@octonato
Copy link
Member Author

octonato commented Aug 4, 2017

It's really a shame that there is so much code duplication here. All classes are doing more or less the same, except the JPA which is a beast apart.

The only difference between them is the usage of collections and Futures. The java versions are in Scala, but use CompletionStage, java Collections and Pair. The Scala versions have Future, scala Collection and EventStreamElement.

Obviously there are some crucial differences between the JDBC (Slick) and Cassandra, but still there are a lot of things in common.

@octonato octonato changed the title [WIP] Saves offset for unhandled events Saves offset for unhandled events Aug 7, 2017
count shouldBe expected
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous versions was using some methods coming from the akka-testkit. It was a combination of within, awaitAssert and Await.result.

I find the ScalaTest DSL for this kind of async test much cleaner. It also reuses the timeout (PatienceConfig) everywhere in the test. We don't need to repeat over and over again.

count should ===(expected)
}
}
private val tag = TestEntity.Evt.AGGREGATE_EVENT_SHARDS.forEntityId("1")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved tag to instance val because it's also used on the fetchLastOffset method

}
private val tag = TestEntity.Evt.AGGREGATE_EVENT_SHARDS.forEntityId("1")

private def createTestEntityRef() = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateEntityRef gets it's own create method.

val handler =
handlers.getOrElse(
// lookup handler
pair.first.getClass.asInstanceOf[Class[Event]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cast is not needed (IDK why it was there in the original code)

It also might be nice to destructure the pair at the top of the mapAsync block... I find with pair.first, pair.second that I have to keep referring back to understand what each one is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, extracting is a great idea

val batch = new BatchStatement
val iter = statements.iterator()
while (iter.hasNext)
batch.add(iter.next)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchStatement has an addAll(Iterable) method... not sure why this wasn't using it in the first place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't now it. Will change for sure

.buildHandler()
.prepare(tag)
.toScala
.map(_.asInstanceOf[O]) // no ClassTag, no mapTo :-(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even parameterize the offset type? AFAICT, nothing actually cares about the type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it comes from my initial wrong assumption that is was always a Long. I had a mapTo[Long] that I had to removed because cassandra was returning a TimeBasedUUID.

But you are right, we just keep it as Any. That's all private and internal usage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or mapTo[Offset], just because I feel bad to return Any. ;-)


readSide
}

private def assertSelectCount(id: String, expected: Long): Unit = {
within(20.seconds) {
awaitAssert {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to switch this to use eventually like in the Java version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, sure. I missed this one

// persisted offset must have change tough
watch(readSide)
system.stop(readSide)
expectTerminated(readSide)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of this be in a finally? If the eventually fails, I guess it won't stop the read side here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will mixin the BeforeAndAfter trait and add it to an after block instead. I'm afraid that a try/finally will add unnecessary noisy to the test method itself.

We are already pushing infrastructure code to helper methods (createReadSideProcessor and createTestEntityRef) .

@octonato octonato requested a review from ignasi35 August 9, 2017 08:44
@octonato octonato added this to the 1.3.7 milestone Aug 10, 2017
}
)

invoke(handler, event, offset).toScala.flatMap(executeStatements)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice extract!

TreePVector
.from(statements)
.plus(offsetDao.bindSaveOffset(akkaOffset))
.asInstanceOf[JList[BoundStatement]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting win.


import scala.compat.java8.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.duration._

trait AbstractReadSideSpec extends ImplicitSender { spec: ActorSystemSpec =>
trait AbstractReadSideSpec extends ImplicitSender with ScalaFutures with Eventually with BeforeAndAfter { spec: ActorSystemSpec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


def executeStatements(statements: JList[BoundStatement]): Future[Done] =
statements.size match {
case 0 => Future.successful(Done.getInstance())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually statements.size is never 0 because when we fallback to an empty handler, we always add at least one statement (save offset) to the list.

def executeStatements(statements: JList[BoundStatement]): Future[Done] =
statements.size match {
case 0 => Future.successful(Done.getInstance())
case 1 => session.executeWrite(statements.get(0)).toScala
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also argue that since statements are never empty, we could simply not check the size and always use BatchStatement. The code would be cleaner in that case with a small penalty of running a batch of one single statement.

def executeStatements(statements: JList[BoundStatement]): Future[Done] = {
  val batch = new BatchStatement
  batch.addAll(statements)
  session.executeWriteBatch(batch).toScala
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, we can add that reasoning as a comment and simplify the code.


def executeStatements(statements: JList[BoundStatement]): Future[Done] = {
val batch = new BatchStatement
// statements is never empty, there is at least the store offset statement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be empty for LegacyCassandraReadSideHandler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem if you do execute an empty batch or will it just handle as a no-op internally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I will check it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't out if this could be a problem, so added a if/else check.

This only affect the javadsl because the scala one does not have a legacy version

@ignasi35 ignasi35 merged commit 03b4ec7 into lagom:master Aug 11, 2017
@ignasi35
Copy link
Contributor

While backporting I had to remove the work done on the SlickReadSide which is a feature not available in 1.3.x.

ignasi35 pushed a commit that referenced this pull request Aug 11, 2017
* saves offset for unhandled events
(resolves #904)

* adds logging back for unhandled events

* saves offset for unhandled events for JDBC, JPA and Slick

* slightly improved JPA version

* just formatting

* Extended tests for storing offsets on unhandled events

* bring similater code together - count and offset check next to each other

* typo

* formatting

* uses `batch.addAll` api + extracting `Pair`

* removes offset type parameter

* controller shutdowns of read side after each test

* Using duration's .seconds and .millis instead of ScalaTest Span

* using Batch API in all cases

* checking for empty statements
@ignasi35
Copy link
Contributor

backport 1.3.x 8417ce6

gmethvin pushed a commit that referenced this pull request Aug 14, 2017
* saves offset for unhandled events
(resolves #904)

* adds logging back for unhandled events

* saves offset for unhandled events for JDBC, JPA and Slick

* slightly improved JPA version

* just formatting

* Extended tests for storing offsets on unhandled events

* bring similater code together - count and offset check next to each other

* typo

* formatting

* uses `batch.addAll` api + extracting `Pair`

* removes offset type parameter

* controller shutdowns of read side after each test

* Using duration's .seconds and .millis instead of ScalaTest Span

* using Batch API in all cases

* checking for empty statements
@octonato octonato deleted the read-side-offset-persistence branch December 22, 2017 08:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

4 participants