Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer gets stuck when it receives a control batch #403

Closed
kkurten opened this issue Jun 20, 2019 · 10 comments
Closed

Consumer gets stuck when it receives a control batch #403

kkurten opened this issue Jun 20, 2019 · 10 comments

Comments

@kkurten
Copy link

kkurten commented Jun 20, 2019

We have a simple data pipeline where events are consumed from topic A and written to topic B by transactional (Java) Streams application. A nodejs microservice then consumes events from topic B (using kafkajs) and writes events to one of our APIs.

We noticed that when kafkajs receives a control batch (https://kafka.apache.org/documentation/#controlbatch) that contains only a single control record the consumer gets permanently stuck. Since the batch contains only a control record, Kafkajs filters out that record from the batch and the batch becomes empty. Since we are not actually consuming any messages the offset is not advanced. The next fetch will receive the same control batch and the same thing happens again. At this point our consumer is basically in a busy loop, constantly fetching the same data and not advancing anywhere (cpu usage also spikes quite a lot). We have a lot of microservices that use kafkajs and this issue seems to only affect topics that contain control batches.

I have not experienced this issue with Java consumer but I'm not sure if this is actually a bug in kafkajs or "working as intended". We are using currently eachMessage handler in our kafkajs microservice so we don't have much control over how e.g. offsets are managed. Should we just use eachBatch in these kind of scenarios to manually advance the offset or is there perhaps something in the kafkajs implementation that could be improved to avoid this situation completely(?).

Kafka version

2.2

Kafkajs version

1.8

Kafka consumer group offsets

TOPIC           PARTITION          CURRENT-OFFSET   LOG-END-OFFSET     LAG             CONSUMER-ID                                                  HOST                        CLIENT-ID
test-topic-1    0                  38187            56935              18748           test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a       127.0.0.1/127.0.0.1         test-microservice
test-topic-1    1                  33034            57693              24659           test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a       127.0.0.1/127.0.0.1         test-microservice
test-topic-1    2                  35574            55538              19964           test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a       127.0.0.1/127.0.0.1         test-microservice

Kafkajs logs

{"level":"DEBUG","timestamp":"2019-06-20T11:16:41.976Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"192.222.134.113:9092","clientId":"test-microservice","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.041Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":0,"expectResponse":true,"size":35}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.051Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 2)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":0,"size":278,"data":{"errorCode":0,"apiVersions":[{"apiKey":0,"minVersion":0,"maxVersion":7},{"apiKey":1,"minVersion":0,"maxVersion":10},{"apiKey":2,"minVersion":0,"maxVersion":5},{"apiKey":3,"minVersion":0,"maxVersion":7},{"apiKey":4,"minVersion":0,"maxVersion":2},{"apiKey":5,"minVersion":0,"maxVersion":1},{"apiKey":6,"minVersion":0,"maxVersion":5},{"apiKey":7,"minVersion":0,"maxVersion":2},{"apiKey":8,"minVersion":0,"maxVersion":6},{"apiKey":9,"minVersion":0,"maxVersion":5},{"apiKey":10,"minVersion":0,"maxVersion":2},{"apiKey":11,"minVersion":0,"maxVersion":4},{"apiKey":12,"minVersion":0,"maxVersion":2},{"apiKey":13,"minVersion":0,"maxVersion":2},{"apiKey":14,"minVersion":0,"maxVersion":2},{"apiKey":15,"minVersion":0,"maxVersion":2},{"apiKey":16,"minVersion":0,"maxVersion":2},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":2},{"apiKey":19,"minVersion":0,"maxVersion":3},{"apiKey":20,"minVersion":0,"maxVersion":3},{"apiKey":21,"minVersion":0,"maxVersion":1},{"apiKey":22,"minVersion":0,"maxVersion":1},{"apiKey":23,"minVersion":0,"maxVersion":2},{"apiKey":24,"minVersion":0,"maxVersion":1},{"apiKey":25,"minVersion":0,"maxVersion":1},{"apiKey":26,"minVersion":0,"maxVersion":1},{"apiKey":27,"minVersion":0,"maxVersion":0},{"apiKey":28,"minVersion":0,"maxVersion":2},{"apiKey":29,"minVersion":0,"maxVersion":1},{"apiKey":30,"minVersion":0,"maxVersion":1},{"apiKey":31,"minVersion":0,"maxVersion":1},{"apiKey":32,"minVersion":0,"maxVersion":2},{"apiKey":33,"minVersion":0,"maxVersion":1},{"apiKey":34,"minVersion":0,"maxVersion":1},{"apiKey":35,"minVersion":0,"maxVersion":1},{"apiKey":36,"minVersion":0,"maxVersion":1},{"apiKey":37,"minVersion":0,"maxVersion":1},{"apiKey":38,"minVersion":0,"maxVersion":1},{"apiKey":39,"minVersion":0,"maxVersion":1},{"apiKey":40,"minVersion":0,"maxVersion":1},{"apiKey":41,"minVersion":0,"maxVersion":1},{"apiKey":42,"minVersion":0,"maxVersion":1},{"apiKey":43,"minVersion":0,"maxVersion":0}],"throttleTime":0}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.051Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"192.222.134.113:9092","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.054Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":1,"expectResponse":true,"size":42}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.063Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":1,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.064Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"192.222.134.113:9092"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.066Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":2,"expectResponse":true,"size":58}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.072Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":2,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.073Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"192.222.134.113:9092"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.078Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":3,"expectResponse":true,"size":62}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.088Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":3,"size":304,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"192.222.134.114","port":9092,"rack":"rack2"},{"nodeId":3,"host":"192.222.134.117","port":9092,"rack":"rack3"},{"nodeId":1,"host":"192.222.134.113","port":9092,"rack":"rack1"}],"clusterId":"kyBfsOTOQQmAKWnftYXbnw","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"test-topic-1","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":1,"replicas":[1,2,3],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":2,"leader":3,"replicas":[3,1,2],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":1,"leader":2,"replicas":[2,3,1],"isr":[3,2,1],"offlineReplicas":[]}]}]}}
{"level":"INFO","timestamp":"2019-06-20T11:16:42.093Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-consumer"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.097Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"192.222.134.114:9092","clientId":"test-microservice","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.143Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"192.222.134.114:9092","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.144Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":0,"expectResponse":true,"size":42}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.153Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":0,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.154Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"192.222.134.114:9092"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.154Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":1,"expectResponse":true,"size":58}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.165Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":1,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.166Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"192.222.134.114:9092"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.169Z","logger":"kafkajs","message":"[Connection] Request GroupCoordinator(key: 10, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":2,"expectResponse":true,"size":62}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.178Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":2,"size":36,"data":{"throttleTime":0,"errorCode":0,"errorMessage":null,"coordinator":{"nodeId":2,"host":"192.222.134.114","port":9092}}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.178Z","logger":"kafkajs","message":"[Cluster] Found group coordinator","nodeId":2}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:42.182Z","logger":"kafkajs","message":"[Connection] Request JoinGroup(key: 11, version: 2)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":3,"expectResponse":true,"size":141}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.276Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 2)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":3,"size":254,"data":{"throttleTime":0,"errorCode":0,"generationId":321,"groupProtocol":"RoundRobinAssigner","leaderId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","memberId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","members":[{"memberId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","memberMetadata":{"type":"Buffer","data":[0,1,0,0,0,1,0,20,97,116,45,115,105,108,116,97,45,118,114,111,112,115,45,97,108,101,114,116,0,0,0,0]}}]}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.278Z","logger":"kafkajs","message":"[ConsumerGroup] Chosen as group leader","groupId":"test-consumer","generationId":321,"memberId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","topics":["test-topic-1"]}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.281Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":4,"expectResponse":true,"size":62}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.290Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":4,"size":304,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"192.222.134.114","port":9092,"rack":"rack2"},{"nodeId":3,"host":"192.222.134.117","port":9092,"rack":"rack3"},{"nodeId":1,"host":"192.222.134.113","port":9092,"rack":"rack1"}],"clusterId":"kyBfsOTOQQmAKWnftYXbnw","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"test-topic-1","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":1,"replicas":[1,2,3],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":2,"leader":3,"replicas":[3,1,2],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":1,"leader":2,"replicas":[2,3,1],"isr":[3,2,1],"offlineReplicas":[]}]}]}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.293Z","logger":"kafkajs","message":"[ConsumerGroup] Group assignment","groupId":"test-consumer","generationId":321,"groupProtocol":"RoundRobinAssigner","assignment":[{"memberId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","memberAssignment":{"type":"Buffer","data":[0,1,0,0,0,1,0,20,97,116,45,115,105,108,116,97,45,118,114,111,112,115,45,97,108,101,114,116,0,0,0,3,0,0,0,0,0,0,0,2,0,0,0,1,0,0,0,0]}}],"topics":["test-topic-1"]}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.299Z","logger":"kafkajs","message":"[Connection] Request SyncGroup(key: 14, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":4,"expectResponse":true,"size":241}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.312Z","logger":"kafkajs","message":"[Connection] Response SyncGroup(key: 14, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":4,"size":62,"data":{"throttleTime":0,"errorCode":0,"memberAssignment":{"type":"Buffer","data":[0,1,0,0,0,1,0,20,97,116,45,115,105,108,116,97,45,118,114,111,112,115,45,97,108,101,114,116,0,0,0,3,0,0,0,0,0,0,0,2,0,0,0,1,0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.313Z","logger":"kafkajs","message":"[ConsumerGroup] Received assignment","groupId":"test-consumer","generationId":321,"memberId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","memberAssignment":{"test-topic-1":[0,2,1]}}
{"level":"INFO","timestamp":"2019-06-20T11:16:45.315Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-consumer","memberId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","leaderId":"test-microservice-b0093482-f946-4af0-b632-7a31b9c8011a","isLeader":true,"memberAssignment":{"test-topic-1":[0,2,1]},"groupProtocol":"RoundRobinAssigner","duration":3220}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.323Z","logger":"kafkajs","message":"[Connection] Request OffsetFetch(key: 9, version: 3)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":5,"expectResponse":true,"size":103}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.331Z","logger":"kafkajs","message":"[Connection] Response OffsetFetch(key: 9, version: 3)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":5,"size":88,"data":{"throttleTime":0,"responses":[{"topic":"test-topic-1","partitions":[{"partition":0,"offset":"38187","metadata":"","errorCode":0},{"partition":1,"offset":"33034","metadata":"","errorCode":0},{"partition":2,"offset":"35574","metadata":"","errorCode":0}]}],"errorCode":0}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.332Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 out of 1 topics","topics":["test-topic-1"],"activeTopics":["test-topic-1"],"pausedTopics":[]}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.334Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"192.222.134.117:9092","clientId":"test-microservice","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.344Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":5,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.345Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":6,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.352Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":5,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.354Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":6,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.389Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"192.222.134.117:9092","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.390Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":0,"expectResponse":true,"size":42}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.397Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":0,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.398Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"192.222.134.117:9092"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.400Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":1,"expectResponse":true,"size":58}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.407Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":1,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.408Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"192.222.134.117:9092"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.409Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":2,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.418Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":2,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.429Z","logger":"kafkajs","message":"[Connection] Request Heartbeat(key: 12, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":7,"expectResponse":true,"size":125}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.437Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":7,"size":10,"data":{"throttleTime":0,"errorCode":0}}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.439Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 out of 1 topics","topics":["test-topic-1"],"activeTopics":["test-topic-1"],"pausedTopics":[]}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.441Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":6,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.441Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":8,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.441Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":3,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.449Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":6,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.451Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":8,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.452Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":3,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.453Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 out of 1 topics","topics":["test-topic-1"],"activeTopics":["test-topic-1"],"pausedTopics":[]}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.455Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":7,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.455Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":9,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.456Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":4,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.464Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":7,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.466Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":9,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.468Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":4,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.469Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 out of 1 topics","topics":["test-topic-1"],"activeTopics":["test-topic-1"],"pausedTopics":[]}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.471Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":8,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.471Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":10,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.472Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":5,"expectResponse":true,"size":118}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.480Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.113:9092","clientId":"test-microservice","correlationId":8,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.481Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.114:9092","clientId":"test-microservice","correlationId":10,"size":160,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2019-06-20T11:16:45.482Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"192.222.134.117:9092","clientId":"test-microservice","correlationId":5,"size":160,"data":"[filtered]"}

Logs are full of those fetch requests, I copied only few here as an example.

Example of control batch in kafkajs

  {
    "topic": "test-topic-1",
    "partition": 0,
    "highWatermark": "55682",
    "unfilteredMessages": [
      {
        "magicByte": 2,
        "attributes": 0,
        "timestamp": "1560229527084",
        "offset": "38187",
        "key": {
          "type": "Buffer",
          "data": [
            0,
            0,
            0,
            1
          ]
        },
        "value": {
          "type": "Buffer",
          "data": [
            0,
            0,
            0,
            0,
            0,
            34
          ]
        },
        "headers": {},
        "isControlRecord": true,
        "batchContext": {
          "firstOffset": "38187",
          "firstTimestamp": "1560229527084",
          "partitionLeaderEpoch": 38,
          "inTransaction": true,
          "isControlBatch": true,
          "lastOffsetDelta": 0,
          "producerId": "8000",
          "producerEpoch": 74,
          "firstSequence": -1,
          "maxTimestamp": "1560229527084",
          "magicByte": 2
        }
      }
    ],
    "messages": []
  }
@arcmode
Copy link

arcmode commented Jun 21, 2019

I'm facing the same issue and have some findings after some debugging.

Here's what's happening in our case.

  1. A transactional batch with real messages + a control message at the end arrive
  2. A consumer configured to manually commit offsets processes all the messages except the filtered control message
  3. Immediate next fetch will retrieve just the control message (the scenario presented in this issue)
  4. Runner sees the batch as empty and just skips it. The cycle repeats now from point n. 3

One nasty workaround to escape from the empty batch situation within kafkajs's runner would be to do something like this:

if (batch.isEmpty()) {
    this.consumerGroup.offsetManager.resolvedOffsets[batch.topic][batch.partition] = batch.lastOffset()
    continue
}

instead of (current code):

if (batch.isEmpty()) {
    continue
}

That kind of worked for me but I think it's not the proper solution and it only served as a way to confirm the root cause of the issue. Ideally we should just commit the control message's offset when finishing processing the batch, just like it would happen when eachBatchAutoResolve: true

On the other hand, if using eachBatchAutoResolve: false we need to await resolveOffset(lastOffset()) after processing all of the batch messages to ensure resolving any control message's offset that's higher than the last real message being processed.

A naive solution within kafkajs itself could be to have the runner to do

if (this.eachBatchAutoResolve ||
batch.lastOffset() !== batch.messages[batch.messages.length - 1].offset) {
  this.consumerGroup.resolveOffset({ topic, partition, offset: batch.lastOffset() })
}

instead of (current code)

if (this.eachBatchAutoResolve) {
  this.consumerGroup.resolveOffset({ topic, partition, offset: batch.lastOffset() })
}

But this is also not the right solution in my opinion because some use-cases require that no offset is committed by the runner including control message offsets (stream aggregations for instance)

Also, I do not know if an empty batch with control message can be generated by committing to a transaction without messages but I'd assume that is the case actually and if we do that we would be again seeing a loop fetching from that control message's offset over and over again.

So, I believe if we can avoid creating empty transactional batches on our producers entirely and on consumer-side we do one of:

  • Use eachBatchAutoResolve: true (default, we can still commit offsets within the batch process and call heartbeat to make sure we don't lose the session)

or

  • Do await resolveOffset(batch.lastOffset()) after we process all the real messages within eachBatch

Then our consumers should be safe and free of this kind of loops.

That said, if the runner did not just continue on empty batches that contain a control message we would not need to care about the possibility of producing empty transactional batches at all because those batches would be passed to eachBatch and we would have a chance to resolve that control message's offset by any of the two consumer-based means stated previously.

I hope this helps @kkurten and others having problems due to this issue.

PS: This is the first time I interact here so I want to say thanks to @tulios and all contributors for sharing this great work with all of us.

@tulios
Copy link
Owner

tulios commented Jun 24, 2019

Hey @kkurten and @drojas, thanks for the excellent report. I will investigate what's happening. It's summer holidays in Sweden (where I live), so we are a bit slow to reply this time of the year. We are planning the 1.9 release; I will take a quick look and check if I can include a fix for this in the upcoming release.

@Nevon
Copy link
Collaborator

Nevon commented Jun 25, 2019

Just wanted to chime in and say that these are some top-notch issues! I wish all issues had this much research put into them and were this clearly explained.

@JaapRood
Copy link
Collaborator

We had a quick go at consuming messages written transactionally before, and I ran into a similar situation, but was clouded by a whole bunch of other issues. Reading what's happening here, I'm pretty sure that was at least a factor in it. Great find!

@jasine
Copy link

jasine commented Jul 17, 2019

any solution or workaround?

@tulios
Copy link
Owner

tulios commented Jul 18, 2019

Not yet, this is almost at the top of my list. 😭
I will give a stab at it soon.

@jasine
Copy link

jasine commented Sep 6, 2019

any progress on this issue?

@brandonl
Copy link

brandonl commented Sep 25, 2019

Also, I do not know if an empty batch with control message can be generated by committing to a transaction without messages but I'd assume that is the case actually and if we do that we would be again seeing a loop fetching from that control message's offset over and over again.

I just wanted to chime in and confirm a possible situation @drojas mentioned above. It is indeed possible to get a complete batch of control messages and causes the consumer to get stuck reprocessing the same control batch indefinitely. Using eachBatch with eachBatchAutoResolve: true and

if (batch.isEmpty()) {
    this.consumerGroup.offsetManager.resolvedOffsets[batch.topic][batch.partition] = batch.lastOffset()
}

in runner.js as mentioned seems to work. Perhaps another solution would be to add a member to batch tracking the filtered messages that allows runner to see if there are any abortedTxns or control messages if an empty batch is detected.

@tulios
Copy link
Owner

tulios commented Sep 30, 2019

I started to look at this now, and I have a unit test that can reproduce the problem (a.k.a control record at the end of the batch).

The Java client auto increments the fetch offset:
https://github.com/apache/kafka/blob/9aa660786e46c1efbf5605a6a69136a1dac6edb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1499-L1505

// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
    return record;
} else {
    // Increment the next fetch offset when we skip a control batch.
    nextFetchOffset = record.offset() + 1;
}

KafkaJS will have to do a similar thing, where you can disable auto-resolve but still be able to fetch the next batch. I will have a PR soon so you can take a look.

@tulios
Copy link
Owner

tulios commented Oct 1, 2019

Pre-release 1.12.0-beta.0 was published with the fix

@tulios tulios closed this as completed Oct 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants