Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

EventCallback.successPredicateFuture and max_uncommited_events problem #98

Closed
vitold opened this issue Feb 20, 2019 · 2 comments
Closed

Comments

@vitold
Copy link

vitold commented Feb 20, 2019

Let's say we have some asynchronous event batch handler:

Async Handler

    val asyncCallback = EventCallback.successPredicateFuture[SomeEvent] { eventCallbackData =>
      eventCallbackData.subscriptionEvent.events.getOrElse(List.empty).traverse {
        _ =>
          println(s"got offset: ${eventCallbackData.subscriptionEvent.cursor.offset}")
          val promise = Promise[Boolean]()

          system.scheduler.scheduleOnce(Random.nextInt(4).seconds){
            println(s"committing offset: ${eventCallbackData.subscriptionEvent.cursor.offset}")
            promise.success(true)
          }

          promise.future

      }.map(_.forall(b => b))
    }

if we have max_uncommited_events parameter greater then batch_limit then potentially we could receive several batches from the same partition before we commit the offset:

Subscription

subscriptionsClient.eventsStreamedManaged[SomeEvent](
      subscriptionId,
      asyncCallback,
      streamConfig = Subscriptions.StreamConfig(maxUncommittedEvents = Some(5),batchLimit = Some(1))
    )

The output

as a result, we have next output:

got offset: Cursor(Partition(0),001-0001-000000000000002939,EventTypeName(nakadi-client-test),CursorToken(f4455cc6-860a-457f-b52c-dde1beca69b8))
got offset: Cursor(Partition(0),001-0001-000000000000002940,EventTypeName(nakadi-client-test),CursorToken(1734ec11-b3fe-4ece-aa7d-0386fb8d76ac))
got offset: Cursor(Partition(0),001-0001-000000000000002941,EventTypeName(nakadi-client-test),CursorToken(05a22e44-4d56-4c0a-9f7e-64c1dd85ba3b))
got offset: Cursor(Partition(0),001-0001-000000000000002942,EventTypeName(nakadi-client-test),CursorToken(342553ba-fd6b-4753-a18b-3554e7ce2ba3))
got offset: Cursor(Partition(0),001-0001-000000000000002943,EventTypeName(nakadi-client-test),CursorToken(b95d98d5-42af-4b4a-8bc5-31bb68a6b05a))
committing offset: Cursor(Partition(0),001-0001-000000000000002942,EventTypeName(nakadi-client-test),CursorToken(342553ba-fd6b-4753-a18b-3554e7ce2ba3))
got offset: Cursor(Partition(0),001-0001-000000000000002944,EventTypeName(nakadi-client-test),CursorToken(63024c14-e063-4598-8116-9c476d80167c))
got offset: Cursor(Partition(0),001-0001-000000000000002945,EventTypeName(nakadi-client-test),CursorToken(c0632672-dacd-4463-b867-196737ba8345))
got offset: Cursor(Partition(0),001-0001-000000000000002946,EventTypeName(nakadi-client-test),CursorToken(d5189ee0-65ce-464b-b0b7-eb9b1d4127af))
got offset: Cursor(Partition(0),001-0001-000000000000002947,EventTypeName(nakadi-client-test),CursorToken(eba1e032-3be4-4ecc-a7a3-afeed1b2ffb2))
committing offset: Cursor(Partition(0),001-0001-000000000000002939,EventTypeName(nakadi-client-test),CursorToken(f4455cc6-860a-457f-b52c-dde1beca69b8))
committing offset: Cursor(Partition(0),001-0001-000000000000002940,EventTypeName(nakadi-client-test),CursorToken(1734ec11-b3fe-4ece-aa7d-0386fb8d76ac))
committing offset: Cursor(Partition(0),001-0001-000000000000002941,EventTypeName(nakadi-client-test),CursorToken(05a22e44-4d56-4c0a-9f7e-64c1dd85ba3b))
2019-02-20 22:12:36,382 WARN  Subscriptions {org.zalando.kanadi.api.Subscriptions $anonfun$commitCursors$7} - SubscriptionId: c8c40e43-5869-4747-99b5-d1c2c86e6a28, StreamId: 21b07ef9-380b-4c37-b35d-0f2e637c2c84 At least one cursor failed to commit, details are CommitCursorResponse(List(CommitCursorItemResponse(Cursor(Partition(0),001-0001-000000000000002939,EventTypeName(nakadi-client-test),CursorToken(f4455cc6-860a-457f-b52c-dde1beca69b8)),outdated)))
2019-02-20 22:12:36,397 WARN  Subscriptions {org.zalando.kanadi.api.Subscriptions $anonfun$commitCursors$7} - SubscriptionId: c8c40e43-5869-4747-99b5-d1c2c86e6a28, StreamId: 21b07ef9-380b-4c37-b35d-0f2e637c2c84 At least one cursor failed to commit, details are CommitCursorResponse(List(CommitCursorItemResponse(Cursor(Partition(0),001-0001-000000000000002940,EventTypeName(nakadi-client-test),CursorToken(1734ec11-b3fe-4ece-aa7d-0386fb8d76ac)),outdated)))
2019-02-20 22:12:36,411 WARN  Subscriptions {org.zalando.kanadi.api.Subscriptions $anonfun$commitCursors$7} - SubscriptionId: c8c40e43-5869-4747-99b5-d1c2c86e6a28, StreamId: 21b07ef9-380b-4c37-b35d-0f2e637c2c84 At least one cursor failed to commit, details are CommitCursorResponse(List(CommitCursorItemResponse(Cursor(Partition(0),001-0001-000000000000002941,EventTypeName(nakadi-client-test),CursorToken(05a22e44-4d56-4c0a-9f7e-64c1dd85ba3b)),outdated)))

The problem

If the batch with offset (2939 -2941) fails we do not retry the batch, because of the offset indirectly already committed with 000000000000002942.

@mdedetrich
Copy link
Collaborator

Functions like .successPredicateFuture are kind of like the wild west, you are responsible for how you commit cursors, not sure what we can do to help here?

@mdedetrich
Copy link
Collaborator

Going to close issue as there isn't much we can do here

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants