Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol error when connecting consumer to AWS MSK Serverless #1449

Closed
jakewins opened this issue Sep 26, 2022 · 13 comments
Closed

Protocol error when connecting consumer to AWS MSK Serverless #1449

jakewins opened this issue Sep 26, 2022 · 13 comments

Comments

@jakewins
Copy link
Contributor

Describe the bug

When I connect a kafkajs consumer to AWS MSK Serverless, it prints out a protocol error in the log, and then the consumer stops.

Connecting and sending data works fine, as does using the admin client.

To Reproduce

Expected behavior

The consumer should start and listen for messages, and then disconnect. There should be no protocol errors.

Observed behavior

The consumer crashes with a protocol error:

KafkaJSProtocolError: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details
    at createErrorFromCode (/app/node_modules/kafkajs/src/protocol/error.js:581:10)
    at Object.parse (/app/node_modules/kafkajs/src/protocol/requests/joinGroup/v5/response.js:35:11)
    at Connection.send (/app/node_modules/kafkajs/src/network/connection.js:433:35)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)
    at async Broker.[private:Broker:sendRequest] (/app/node_modules/kafkajs/src/broker/index.js:904:14)
    at async Broker.joinGroup (/app/node_modules/kafkajs/src/broker/index.js:395:14)
    at async ConsumerGroup.[private:ConsumerGroup:join] (/app/node_modules/kafkajs/src/consumer/consumerGroup.js:169:23)
    at async /app/node_modules/kafkajs/src/consumer/consumerGroup.js:335:9
    at async Runner.start (/app/node_modules/kafkajs/src/consumer/runner.js:84:7)
    at async start (/app/node_modules/kafkajs/src/consumer/index.js:243:7)

As far as I can tell, broker logs are not available in MSK Serverless.

Environment:

  • OS: Linux
  • KafkaJS version: 2.2.0
  • Kafka version: MSK Serverless, version running 2022-09-26
  • NodeJS version: 16
@jakewins
Copy link
Contributor Author

jakewins commented Sep 27, 2022

Running the same code through the Java client with debug logging, it looks like it picks protocol version 7:

10:05:24.511 [Thread-0] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test2-1, groupId=test2] Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, clientId=consumer-test2-1, correlationId=3) and timeout 305000 to node 2147483551: JoinGroupRequestData(groupId='test2', sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[<snip>]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[<snip>])], reason='')

So it works with version 7 in the java client, but seemingly not with version 5 in kafkajs.

So.. maybe MSK Serverless advertises that it supports protocol version 5, but actually it doesn't? That seems more likely than that kafkajs is incorrectly implementing version 5.

@jakewins
Copy link
Contributor Author

Update: no, if I force the Java client to use version 5, MSK Serverless still accepts it. But, noting that the Java client fails the first message it sends, saying it needs a memberid, then succeeds when it re-sends a message with the memberid:

10:44:26.589 [Thread-0] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test2-1, groupId=test2] Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-test2-1, correlationId=5) and timeout 305000 to node 2147483551: JoinGroupRequestData(groupId='test2', sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, memberId='consumer-test2-1-b8404ac1-af80-4a24-9dd5-1f525f08372c', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[<snip>]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[<snip>])], reason='rebalance failed due to MemberIdRequiredException')
10:44:29.664 [Thread-0] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test2-1, groupId=test2] Received JOIN_GROUP response from node 2147483551 for request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-test2-1, correlationId=5): JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=null, protocolName='range', leader='consumer-test2-1-b8404ac1-af80-4a24-9dd5-1f525f08372c', skipAssignment=false, memberId='consumer-test2-1-b8404ac1-af80-4a24-9dd5-1f525f08372c', members=[JoinGroupResponseMember(memberId='consumer-test2-1-b8404ac1-af80-4a24-9dd5-1f525f08372c', groupInstanceId=null, metadata=[<snip>])])

@Nevon
Copy link
Collaborator

Nevon commented Sep 27, 2022

saying it needs a memberid, then succeeds when it re-sends a message with the member

This is expected and is how the protocol is supposed to work.

As for the protocol version, the way it should work is that the broker and client compare their list of supported versions and picks the highest one they both support. So if KafkaJS picked 5 that was the highest protocol version that both it and MSK supports for JoinGroup.

You could try creating an MSK cluster and connect to that, since for a regular MSK cluster you should have access to broker logs. That said, not sure the logs will actually tell you anything useful. This is the problem with proprietary SaaS where you don't have access to logs or source and there's not even a way to test things without paying them for the privilege of supporting their software.

@sankalpbhatia
Copy link
Contributor

Hi @jakewins and @Nevon,

I am an engineer with the AWS MSK Serverless team. First of all, thanks for putting together a repro library, it was quite helpful in debugging and reproducing the issue.

We have done an initial investigation and it seems like the JoinGroupRequest from the KafkaJS library doesn't adhere to the Kafka protocol. Using the repro code shared here, we seem to be getting the following request from the client library.

JoinGroupRequestData(groupId='test', sessionTimeoutMs=30000, rebalanceTimeoutMs=60000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='RoundRobinAssigner', metadata=[0, 1, 0, 0, 0, 1, 0, 9, 116, 101, 115, 116, 116, 111, 112, 105, 99, 0, 0, 0, 0])

The metadata field is of interest to us. That encodes the ConsumerProtocolSubscription field. The ConsumerProtocolSubscription structure contains the following fields

"fields": [
    { "name": "Topics", "type": "[]string", "versions": "0+" },
    { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
      "default": "null", "zeroCopy": true },
    { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
      "fields": [
        { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" },
        { "name": "Partitions", "type": "[]int32", "versions": "1+"}
      ]
    }
  ]

Taking a deeper look into the metadata

metadata=[0, 1, 0, 0, 0, 1, 0, 9, 116, 101, 115, 116, 116, 111, 112, 105, 99, 0, 0, 0, 0])

The first 2 bytes (short) are the version of the metadata structure- i.e. 0,1 => 1
The next 4 bytes (int) are length of the topics array - i.e. 0,0,0,1 => 1
The next 2 bytes (short) are the length of the first and only element in the topics array - i.e. 0, 9 => 9
The next 9 bytes (char array) are the first element of the topics array - i.e. 116, 101, 115, 116, 116, 111, 112, 105, 99 : which is ASCII for "testtopic"
The next 4 bytes (int) denote the size of the userData byte array, i.e. 0, 0, 0, 0 => 0

Since the protocol version is 1, it is expected that the next 4 bytes of the byte array tell us the size of "ownedPartitions" collection for the topic. However, in these requests, we do not find any.

On looking further, I found that version is always set to 1 for the Assigner supported (RoundRobin) by kafkajs

We can also see that at the time of encoding these bytes, the library only adds version, topics and userdata. (notice that no ownedTopics field is added). This conforms to version 0 of the protocol metadata schema, not version 1.

encode({ version, assignment, userData = Buffer.alloc(0) }) {

I have tried out a simple fix in my local setup, where I change the version of roundRobinAssigner from 1 to 0. I have verified using the reproducer mentioned here https://github.com/jakewins/kafkajs-msk-protocolerr-repro/blob/main/index.ts that this works for both MSK Provisioned and MSK Serverless.

Also note that we are aware this works with MSK Provisioned clusters and Vanilla Apache Kafka clusters, but MSK Serverless implementation requires the metadata field to adhere strictly to the protocol.

Please let me know if you agree with the findings here or if we have missed something. I am happy to contribute the fix if needed.

@Nevon
Copy link
Collaborator

Nevon commented Oct 11, 2022

Thanks for your investigation @sankalpbhatia! I'm curious why MSK Serverless is decoding the embedded protocol metadata. That isn't part of the network protocol, as you can see from the protocol definition where it is defined as just BYTES. The intention of that field is to hold an embedded protocol for the consumers, not the brokers. If you look at where the ConsumerProtocolSubscription is used, as far as I can tell it's only ever on the client-side. Granted, in the old protocol documentation they recommend that consumers implement the same envelope for all assignment protocols, but it's still not part of the network protocol:

Below we define the embedded protocol used by consumer groups. We recommend all consumer implementations follow this format so that tooling will work correctly across all clients.

So I would say it looks like MSK is peeking into client internals from the server-side and assuming that the Java client implementation defines the protocol, which I don't think is quite right.

That said, adding this field still makes sense to do, since it's a requirement for incremental cooperative rebalancing, so I don't particularly see any reason to deviate from the Java implementation and the recommendation from the documentation. However, we need to think carefully about what to do here.

  1. We keep the version at 1 and add the assignedPartitions field. Now we might have a problem when the day comes to support incremental cooperative rebalancing, since we will now have v1 assigners that don't behave the way we expect them to.
  2. We "demote" the current assigners to v0. Does this mean that the group will not be able to agree on an assigner to use, since the old members will only have v1 and the new members only v0? I'm not sure if the version is part of the assigner selection.

I'm leaning towards the second option, but we'll need to see what happens when a consumer group has members with both the old and new version.

@sankalpbhatia
Copy link
Contributor

Thanks @Nevon for the inputs and sorry for the delayed reply. Currently, decoding the protocol metadata in MSK Serverless is part of an internal implementation detail.

I prefer option 2 as well. Looking at the code, I do not see the assigner using version to assign partitions. I will do a quick test about a consumer group having two different versioned consumers, and will report back. Will also try to create a pull request in the next couple of working days.

@rajivkr
Copy link

rajivkr commented Oct 17, 2022

+1 facing the same issue

@sankalpbhatia
Copy link
Contributor

@Nevon I did a simple test where I ran 2 consumers. First consumer had the version of RoundRobinAssigner set to 0, while the second one had version 1. When running these two against a MSK Provisioned cluster, I observed that

  1. Both the consumers were able to join the group successfully.
  2. Both consumers were able to assume leadership if one of the consumers died.
  3. Both the consumers were able to receive assignments (n/2 partitions each) when running in parallel.

This makes sense as currently, the consumerGroup coordinator in the library does not use 'version' to determine the assigner to be used:
https://github.com/sankalpbhatia/kafkajs/blob/e6789dd02a3d076afed1840013b9846177acae28/src/consumer/consumerGroup.js#L211

I have raised a pull request which changes the RoundRobin assigner version to 0, that seems to be passing the existing checks.

@Nevon
Copy link
Collaborator

Nevon commented Oct 17, 2022

Excellent! In a few minutes, the beta release channel will contain version 2.3.0-beta.1 with this fix. Could you please verify that connecting to MSK serverless works with this version?

@sankalpbhatia
Copy link
Contributor

Thanks for merging the request. I have verified version 2.3.0-beta.1 works with MSK Serverless

@rajivkr
Copy link

rajivkr commented Oct 18, 2022

Looks to be working fine now, thanks for the fix!

@Nevon
Copy link
Collaborator

Nevon commented Oct 18, 2022

This fix will be out in v2.2.2.

@Nevon Nevon closed this as completed Oct 18, 2022
@unicomp23
Copy link

@Nevon why isn't this in kafkajs? or am i missing something?

https://github.com/jmaver-plume/kafkajs-msk-iam-authentication-mechanism

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants