Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset not persisted when no matching event handler is found #904

Closed
octonato opened this issue Jul 26, 2017 · 0 comments
Closed

Offset not persisted when no matching event handler is found #904

octonato opened this issue Jul 26, 2017 · 0 comments

Comments

@octonato
Copy link
Member

octonato commented Jul 26, 2017

The CassandraReadSideHandler processes incoming events whenever an event handler is defined for an event. While processing it, the offset is saved in the same transaction.

However, offset is not saved when a handler is not found. This is not considered an error as we simply return Future.successul(Done). Basically, we skip the event.
It would be better to save the offset so this event (and potentially a long list of explicitly ignored events) won't be offered again in case of redeployment/restart.

(code fragment extracted from here)

Flow[EventStreamElement[Event]].mapAsync(parallelism = 1) { elem =>
      handlers.get(elem.event.getClass.asInstanceOf[Class[Event]]) match {
        case Some(handler) => invokeHandler(handler, elem)
        case None =>
          if (log.isDebugEnabled)
            log.debug("Unhandled event [{}]", elem.event.getClass.getName)
          Future.successful(Done)
      }
    }.withAttributes(ActorAttributes.dispatcher(dispatcher))

Alternatively we could have a fluent API similar to what was proposed in #844. In which case, the users will need to explicit declare which events they are not interested.

@TimMoore TimMoore added this to the 1.3.7 milestone Jul 31, 2017
octonato pushed a commit to octonato/lagom that referenced this issue Aug 4, 2017
octonato pushed a commit to octonato/lagom that referenced this issue Aug 4, 2017
@octonato octonato self-assigned this Aug 9, 2017
ignasi35 pushed a commit that referenced this issue Aug 11, 2017
* saves offset for unhandled events
(resolves #904)

* adds logging back for unhandled events

* saves offset for unhandled events for JDBC, JPA and Slick

* slightly improved JPA version

* just formatting

* Extended tests for storing offsets on unhandled events

* bring similater code together - count and offset check next to each other

* typo

* formatting

* uses `batch.addAll` api + extracting `Pair`

* removes offset type parameter

* controller shutdowns of read side after each test

* Using duration's .seconds and .millis instead of ScalaTest Span

* using Batch API in all cases

* checking for empty statements
ignasi35 pushed a commit that referenced this issue Aug 11, 2017
* saves offset for unhandled events
(resolves #904)

* adds logging back for unhandled events

* saves offset for unhandled events for JDBC, JPA and Slick

* slightly improved JPA version

* just formatting

* Extended tests for storing offsets on unhandled events

* bring similater code together - count and offset check next to each other

* typo

* formatting

* uses `batch.addAll` api + extracting `Pair`

* removes offset type parameter

* controller shutdowns of read side after each test

* Using duration's .seconds and .millis instead of ScalaTest Span

* using Batch API in all cases

* checking for empty statements
gmethvin pushed a commit that referenced this issue Aug 14, 2017
* saves offset for unhandled events
(resolves #904)

* adds logging back for unhandled events

* saves offset for unhandled events for JDBC, JPA and Slick

* slightly improved JPA version

* just formatting

* Extended tests for storing offsets on unhandled events

* bring similater code together - count and offset check next to each other

* typo

* formatting

* uses `batch.addAll` api + extracting `Pair`

* removes offset type parameter

* controller shutdowns of read side after each test

* Using duration's .seconds and .millis instead of ScalaTest Span

* using Batch API in all cases

* checking for empty statements
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants