Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barrier causes uncontrolled execution flow #1300

Closed
mark-b-ab opened this issue Feb 22, 2022 · 6 comments
Closed

Barrier causes uncontrolled execution flow #1300

mark-b-ab opened this issue Feb 22, 2022 · 6 comments

Comments

@mark-b-ab
Copy link

mark-b-ab commented Feb 22, 2022

Describe the bug
When an exception in onBatch is being thrown, as mentioned in #1064 the error is swallowed, but the loop with batches still executes.

Concurrent should handle the problem by calling clear, but the promise is resolved by unlockWithError

To Reproduce

  1. Run a producer that created 10 messages (5 per partition) in a topic with 2 partitions
  2. Run a consumer (partitionsConsumedConcurrently = 1) that subscribes to that topic as a batch consumer and logs "start" + message, waits for 5s, logs "end" + message
  3. After the first successful "end" message, start the second consumer.

Expected behavior
Second message processes, heartbeat starts, fails with "rebalancing error", rebalance starts, no messages being processed till the end of the rebalance.

Observed behavior
Second message processes, heartbeat starts, fails with "rebalancing error", messages from the second partition starts to process in parallel to rebalance, rebalance starts

Logs

➜ nest-playground git:(master) ✗ node dist/src/main.js
[Nest] 62411 - 02/22/2022, 3:41:04 PM LOG [NestFactory] Starting Nest application...
[Nest] 62411 - 02/22/2022, 3:41:04 PM LOG [InstanceLoader] AppModule dependencies initialized +28ms
[Nest] 62411 - 02/22/2022, 3:41:04 PM LOG [InstanceLoader] TypeOrmModule dependencies initialized +0ms
[Nest] 62411 - 02/22/2022, 3:41:04 PM LOG [InstanceLoader] KafkaModule dependencies initialized +0ms
[Nest] 62411 - 02/22/2022, 3:41:04 PM LOG [InstanceLoader] TypeOrmCoreModule dependencies initialized +56ms
[Nest] 62411 - 02/22/2022, 3:41:04 PM DEBUG [Kafka: Connection] Connecting
[Nest] 62411 - 02/22/2022, 3:41:04 PM DEBUG [Kafka: Connection] Request ApiVersions(key: 18, version: 2)
[Nest] 62411 - 02/22/2022, 3:41:04 PM DEBUG [Kafka: Connection] Response ApiVersions(key: 18, version: 2)
[Nest] 62411 - 02/22/2022, 3:41:04 PM DEBUG [Kafka: Broker] Verified support for SaslAuthenticate
[Nest] 62411 - 02/22/2022, 3:41:04 PM DEBUG [Kafka: Connection] Request Metadata(key: 3, version: 6)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response Metadata(key: 3, version: 6)
[Nest] 62411 - 02/22/2022, 3:41:05 PM LOG [Kafka: Consumer] Starting
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request GroupCoordinator(key: 10, version: 2)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response GroupCoordinator(key: 10, version: 2)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Cluster] Found group coordinator
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request JoinGroup(key: 11, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response JoinGroup(key: 11, version: 5)
The group member needs to have a valid member id before actually entering a consumer group
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request JoinGroup(key: 11, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response JoinGroup(key: 11, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: ConsumerGroup] Chosen as group leader
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request Metadata(key: 3, version: 6)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response Metadata(key: 3, version: 6)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: ConsumerGroup] Group assignment
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request SyncGroup(key: 14, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response SyncGroup(key: 14, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: ConsumerGroup] Received assignment
[Nest] 62411 - 02/22/2022, 3:41:05 PM LOG [Kafka: ConsumerGroup] Consumer has joined the group
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request OffsetFetch(key: 9, version: 4)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response OffsetFetch(key: 9, version: 4)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request ListOffsets(key: 2, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response ListOffsets(key: 2, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: ConsumerGroup] Fetching from 2 partitions for 1 out of 1 topics
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Request Fetch(key: 1, version: 11)
[Nest] 62411 - 02/22/2022, 3:41:05 PM DEBUG [Kafka: Connection] Response Fetch(key: 1, version: 11)
⏳ Partition -> 0 Offset -> 0 Message: 1
✅ Partition -> 0 Offset -> 0 Message: 1
[Nest] 62411 - 02/22/2022, 3:41:10 PM DEBUG [Kafka: Connection] Request OffsetCommit(key: 8, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:10 PM DEBUG [Kafka: Connection] Response OffsetCommit(key: 8, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:10 PM DEBUG [Kafka: Connection] Request Heartbeat(key: 12, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:10 PM DEBUG [Kafka: Connection] Response Heartbeat(key: 12, version: 3)
⏳ Partition -> 0 Offset -> 1 Message: 3
-----HERE I STARTED SECOND CONSUMER-----
✅ Partition -> 0 Offset -> 1 Message: 3
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request OffsetCommit(key: 8, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response OffsetCommit(key: 8, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request Heartbeat(key: 12, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:15 PM ERROR [Kafka: Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
⏳ Partition -> 1 Offset -> 0 Message: 2
[Nest] 62411 - 02/22/2022, 3:41:15 PM WARN [Kafka: Runner] The group is rebalancing, re-joining
The group is rebalancing, so a rejoin is needed
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request GroupCoordinator(key: 10, version: 2)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response GroupCoordinator(key: 10, version: 2)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Cluster] Found group coordinator
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request JoinGroup(key: 11, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response JoinGroup(key: 11, version: 5)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: ConsumerGroup] Chosen as group leader
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request Metadata(key: 3, version: 6)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response Metadata(key: 3, version: 6)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: ConsumerGroup] Group assignment
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request SyncGroup(key: 14, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response SyncGroup(key: 14, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: ConsumerGroup] Received assignment
[Nest] 62411 - 02/22/2022, 3:41:15 PM LOG [Kafka: ConsumerGroup] Consumer has joined the group
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request OffsetFetch(key: 9, version: 4)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response OffsetFetch(key: 9, version: 4)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request ListOffsets(key: 2, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response ListOffsets(key: 2, version: 3)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: ConsumerGroup] Fetching from 1 partitions for 1 out of 1 topics
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Request Fetch(key: 1, version: 11)
[Nest] 62411 - 02/22/2022, 3:41:15 PM DEBUG [Kafka: Connection] Response Fetch(key: 1, version: 11)
⏳ Partition -> 1 Offset -> 0 Message: 2
[Nest] 62411 - 02/22/2022, 3:41:15 PM ERROR [Kafka: Runner] Error when calling eachBatch
Error: DUUUUPLICATEEEEEE

Environment:

  • OS: macOS 12.2.1
  • KafkaJS version 1.16.0
  • Kafka version 3.1.0
  • NodeJS version 17.5.0

Additional context
Add any other context about the problem here.

@mark-b-ab
Copy link
Author

batches.map(batch =>
concurrently(async () => {
try {
if (!this.running) {
return
}
await onBatch(batch)
} catch (e) {
unlockWithError(e)
} finally {
numberOfExecutions++
if (requestsCompleted && numberOfExecutions === expectedNumberOfExecutions) {
unlock()
}
}
}).catch(unlockWithError)
)

@mark-b-ab
Copy link
Author

mark-b-ab commented Feb 22, 2022

Adding throw e; after line 378 solved the issue for me.

This will trigger concurrently clean and discard the rest of the jobs.

I am not sure about impact on partitionsConsumedConcurrently > 1 consumers

@Nevon
Copy link
Collaborator

Nevon commented Feb 22, 2022

Can you see if this is reproducible with #1258? The barrier is going to be gone as soon as we merge that, in which case we don't need to spend time and effort on debugging this in the current version.

@mark-b-ab
Copy link
Author

will check now

@mark-b-ab
Copy link
Author

mark-b-ab commented Feb 22, 2022

@Nevon The issue with concurrent execution does not reproduce on the fork you sent, but there is another issue with double processing the same message (even after offset commit)

[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: ConsumerGroup] Received assignment
[Nest] 66108 - 02/22/2022, 4:45:21 PM LOG [Kafka: ConsumerGroup] Consumer has joined the group
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: FetchManager] Starting...
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: FetchManager] Created 1 fetchers
[Nest] 66108 - 02/22/2022, 4:45:21 PM LOG Consumer started
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Request OffsetFetch(key: 9, version: 4)
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Response OffsetFetch(key: 9, version: 4)
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Request ListOffsets(key: 2, version: 3)
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Response ListOffsets(key: 2, version: 3)
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Connecting
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Request Fetch(key: 1, version: 11)
[Nest] 66108 - 02/22/2022, 4:45:21 PM DEBUG [Kafka: Connection] Response Fetch(key: 1, version: 11)
⏳ Partition -> 1 Offset -> 0 Message: 2
✅ Partition -> 1 Offset -> 0 Message: 2
[Nest] 66108 - 02/22/2022, 4:45:26 PM DEBUG [Kafka: Connection] Request OffsetCommit(key: 8, version: 5)
[Nest] 66108 - 02/22/2022, 4:45:26 PM DEBUG [Kafka: Connection] Response OffsetCommit(key: 8, version: 5)
[Nest] 66108 - 02/22/2022, 4:45:26 PM DEBUG [Kafka: Connection] Request Heartbeat(key: 12, version: 3)
[Nest] 66108 - 02/22/2022, 4:45:26 PM DEBUG [Kafka: Connection] Response Heartbeat(key: 12, version: 3)
⏳ Partition -> 1 Offset -> 1 Message: 4
✅ Partition -> 1 Offset -> 1 Message: 4
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Request OffsetCommit(key: 8, version: 5)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Response OffsetCommit(key: 8, version: 5)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Request Heartbeat(key: 12, version: 3)
[Nest] 66108 - 02/22/2022, 4:45:31 PM ERROR [Kafka: Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Nest] 66108 - 02/22/2022, 4:45:31 PM WARN [Kafka: Runner] The group is rebalancing, re-joining
The group is rebalancing, so a rejoin is needed
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Request GroupCoordinator(key: 10, version: 2)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Response GroupCoordinator(key: 10, version: 2)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Cluster] Found group coordinator
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Request JoinGroup(key: 11, version: 5)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Response JoinGroup(key: 11, version: 5)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: ConsumerGroup] Chosen as group leader
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Request Metadata(key: 3, version: 6)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Response Metadata(key: 3, version: 6)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: ConsumerGroup] Group assignment
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Request SyncGroup(key: 14, version: 3)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: Connection] Response SyncGroup(key: 14, version: 3)
[Nest] 66108 - 02/22/2022, 4:45:31 PM DEBUG [Kafka: ConsumerGroup] Received assignment
[Nest] 66108 - 02/22/2022, 4:45:31 PM LOG [Kafka: ConsumerGroup] Consumer has joined the group
⏳ Partition -> 1 Offset -> 0 Message: 2
[Nest] 66108 - 02/22/2022, 4:45:31 PM ERROR [Kafka: Runner] Error when calling eachBatch
Error: DUUUUPLICATEEEEEE

@mark-b-ab
Copy link
Author

Closing this issue due to the release of KafkaJS 1.7.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants