Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to read messages from MSK Serverless cluster #967

Closed
mtkopone opened this issue Aug 17, 2022 · 19 comments
Closed

Unable to read messages from MSK Serverless cluster #967

mtkopone opened this issue Aug 17, 2022 · 19 comments
Assignees
Labels

Comments

@mtkopone
Copy link
Contributor

mtkopone commented Aug 17, 2022

While attempting to switch from an AWS MSK Provisioned Cluster to an MSK Serverless Cluster:
Writing messages to topics works, but attempting to read from a topic fails.

Kafka Version

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

AWS is very vague about the version of Kafka they are running in the Serverless setup, but the examples here use the client library intended for 2.8.1. (Consuming messages with that java client works.)

To Reproduce

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
	awsCfg "github.com/aws/aws-sdk-go-v2/config"
	kafka "github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)

func main() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT)
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		sig := <-signals
		fmt.Println("Got signal: ", sig)
		cancel()
	}()

	bootstrapServers := []string{"<kafka-bootstrap-server-url>"}
	topic := "example-topic"

	cfg, _ := awsCfg.LoadDefaultConfig(ctx)
	creds, _ := cfg.Credentials.Retrieve(ctx)
	m := &aws_msk_iam_v2.Mechanism{
		Signer:      signer.NewSigner(),
		Credentials: creds,
		Region:      "us-east-1",
		SignTime:    time.Now(),
		Expiry:      time.Minute * 15,
	}

	config := kafka.ReaderConfig{
		Brokers: bootstrapServers,
		GroupID: "test-consumer-group-1",
		Topic:   topic,
		// Partition: 0,
		MaxWait: 50000 * time.Millisecond,
		Dialer: &kafka.Dialer{
			Timeout:       50 * time.Second,
			DualStack:     true,
			SASLMechanism: m,
			TLS: &tls.Config{
				MinVersion: tls.VersionTLS12,
			},
		},
	}

	r := kafka.NewReader(config)
	fmt.Println("Consumer configuration: ", config)

	defer func() {
		err := r.Close()
		if err != nil {
			fmt.Println("Error closing consumer: ", err)
			return
		}
		fmt.Println("Consumer closed")
	}()

	for {
		m, err := r.ReadMessage(ctx)
		// m, err := r.FetchMessage(ctx)
		if err != nil {
			fmt.Printf("Error reading message: %+v\n", err)
			break
		}
		fmt.Printf("Received message from %s-%d [%d]: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

Expected Behavior

Messages should be received from the topic.

Observed Behavior

Running the example fails.

When providing a groupID in the ReaderConfig, the error is:

Error reading message: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details

Log output:

entering loop for consumer group, test-consumer-group-1
Failed to join group test-consumer-group-1: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details
[42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details

Without a groupId, and with a partition, the error is:

Error reading message: unexpected EOF

Log output:

initializing kafka reader for partition 0 of example-topic starting at offset first offset
kafka reader failed to read lag of partition 0 of example-topic: EOF

Additional Context

The topic has been created beforehand, and messages have been written to that topic.

I have also tried most of the possible combinations of TLSConfig.MinVersion and MaxVersion available, but to no avail.

@mtkopone mtkopone added the bug label Aug 17, 2022
@mtkopone mtkopone changed the title Unable to read messages from msk-serverless Unable to read messages from MSK Serverless cluster Aug 17, 2022
@rhansen2 rhansen2 self-assigned this Aug 19, 2022
@rhansen2
Copy link
Collaborator

Hi @mtkopone,

Thanks for the detailed report!

Unfortunately based on #948 I don't we're compatible with 2.8.1 so that could be the problem

You could give https://github.com/rhansen2/kafka-go/tree/consumer-group-client , it makes changes to the Reader including supporting more API Versions but it is a WIP.

It doesn't seem like a TLS issue but have you tried with InsecureSkipVerify in your TLS Config?

@kikyomits
Copy link
Contributor

kikyomits commented Aug 30, 2022

@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn't be a problem.

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.

@rhansen2
Copy link
Collaborator

rhansen2 commented Sep 2, 2022

Thanks for the additional insight @kikyomits.

It also appears there's a bug in the v2 iam package integration #976

The error messages appear different but could also be related.

@mtkopone
Copy link
Contributor Author

mtkopone commented Sep 9, 2022

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK

I double checked the port, it's correct. As for IAM, that's the only option for MSK Serverless, so it's enabled.

@kikyomits
Copy link
Contributor

kikyomits commented Sep 15, 2022

@mtkopone interesting, can you try to connect to your MSK Serverless via CLI? The steps are well-documented in this blog.
https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk

The section Create a Apache Kafka topic from an EC2 instance it the one I want you to try. It shows the steps for

  1. Create EC2 instance in the network where your application is running and should be able to reach MSK
  2. Download Kafka Client library
  3. Download MSK IAM Jar file
  4. set up client-config.properties file
  5. Run Kafka client (e.g. list topics) which calls MSK API with using IAM authentication.

It will help me to understand where the root cause of your issue is.

@mtkopone
Copy link
Contributor Author

mtkopone commented Sep 15, 2022

@mtkopone interesting, can you try to connect to your MSK Serverless via CLI? The steps are well-documented in this blog.
https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk

Yeah. The java client library in that example works as a consumer.

Same endpoint, same config.

@fuatto
Copy link

fuatto commented Oct 6, 2022

@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn't be a problem.

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.

^ I had the same issue and the hint network connection failure has helped me. Turns out I miss the TLS config (even tho set the InsecureSkipVerify to true). Thanks, @kikyomits !

@mtkopone
Copy link
Contributor Author

mtkopone commented Oct 17, 2022

I upgraded to the latest versions, and the same errors still occur against MSK Serverless with the following:

github.com/segmentio/kafka-go v0.4.35
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20221014170723-bef291169c84

Also tried with InsecureSkipVerify in TLSConfig. Same problem.

On a different tact, it looks like the javascript kafkajs library has a very similar issue. Here: tulios/kafkajs#1449. An AWS engineer pinpointed where the MSK Serverless protocol differs from the one used by that library here: tulios/kafkajs#1449 (comment) that could provide some light to this issue?

@sankalpbhatia
Copy link

sankalpbhatia commented Oct 17, 2022

@mtkopone Using debug logging, is it possible to extract the JoinGroupRequestData from the JoinGroup requests sent from this library, especially the embedded protocol metadata? Once we have that, we should check if we have the same issue described here tulios/kafkajs#1449 (comment)

@mtkopone
Copy link
Contributor Author

It wasn't the JoinGroupRequest.

Debugging further.... MSK Serverless reports it would support MetadataRequests in API versions 0 - 11.
But looks like any MetadataRequest with API version < 6 only gets an EOF as a response.

@sankalpbhatia
Copy link

Is it possible to share the request and response for APIVersions API, and the Metadata request?

@mtkopone
Copy link
Contributor Author

mtkopone commented Oct 21, 2022

Sure thing, here you go:
Bytes are all in base-10 decimal...

ApiVersions request: (clientId = "1")
[0 0 0 11 0 18 0 0 0 0 0 1 0 1 49]

ApiVersions response:

[0 0 1 90 0 0 0 1 0 0 0 0 0 56
 0 0 0 0 0 9
 0 1 0 0 0 12
 0 2 0 0 0 6 
 0 3 0 0 0 11 
 0 4 0 0 0 5 
 0 5 0 0 0 3 
 0 6 0 0 0 7 
 0 7 0 0 0 3
 0 8 0 0 0 8 
 0 9 0 0 0 7 
 0 10 0 0 0 3 0 11 0 0 0 7 0 12 0 0 0 4 0 13 0 0 0 4 0 14 0 0 0 5 0 15 0 0 0 5 0 16 0 0 0 4 0 17 0 0 0 1 0 18 0 0 0 3 0 19 0 0 0 7 
 0 20 0 0 0 6 0 21 0 0 0 2 0 22 0 0 0 4 0 23 0 0 0 4 0 24 0 0 0 3 0 25 0 0 0 3 0 26 0 0 0 3 0 27 0 0 0 1 0 28 0 0 0 3 0 29 0 0 0 2 
 0 30 0 0 0 2 0 31 0 0 0 2 0 32 0 0 0 4 0 33 0 0 0 2 0 34 0 0 0 2 0 35 0 0 0 2 0 36 0 0 0 2 0 37 0 0 0 3 0 38 0 0 0 2 0 39 0 0 0 2 
 0 40 0 0 0 2 0 41 0 0 0 2 0 42 0 0 0 2 0 43 0 0 0 2 0 44 0 0 0 1 0 45 0 0 0 0 0 46 0 0 0 0 0 47 0 0 0 0 0 48 0 0 0 1 0 49 0 0 0 1 
 0 50 0 0 0 0 0 51 0 0 0 0 0 56 0 0 0 0 0 57 0 0 0 0 0 60 0 0 0 0 0 61]

Which gets parsed to:

map[Produce:Produce[v0:v9] Fetch:Fetch[v0:v12] ListOffsets:ListOffsets[v0:v6] Metadata:Metadata[v0:v11] LeaderAndIsr:LeaderAndIsr[v0:v5] StopReplica:StopReplica[v0:v3] UpdateMetadata:UpdateMetadata[v0:v7] ControlledShutdown:ControlledShutdown[v0:v3] OffsetCommit:OffsetCommit[v0:v8] OffsetFetch:OffsetFetch[v0:v7] FindCoordinator:FindCoordinator[v0:v3] JoinGroup:JoinGroup[v0:v7] Heartbeat:Heartbeat[v0:v4] LeaveGroup:LeaveGroup[v0:v4] SyncGroup:SyncGroup[v0:v5] DescribeGroups:DescribeGroups[v0:v5] ListGroups:ListGroups[v0:v4] SaslHandshake:SaslHandshake[v0:v1] ApiVersions:ApiVersions[v0:v3] CreateTopics:CreateTopics[v0:v7] DeleteTopics:DeleteTopics[v0:v6] DeleteRecords:DeleteRecords[v0:v2] InitProducerId:InitProducerId[v0:v4] OffsetForLeaderEpoch:OffsetForLeaderEpoch[v0:v4] AddPartitionsToTxn:AddPartitionsToTxn[v0:v3] AddOffsetsToTxn:AddOffsetsToTxn[v0:v3] EndTxn:EndTxn[v0:v3] WriteTxnMarkers:WriteTxnMarkers[v0:v1] TxnOffsetCommit:TxnOffsetCommit[v0:v3] DescribeAcls:DescribeAcls[v0:v2] CreateAcls:CreateAcls[v0:v2] DeleteAcls:DeleteAcls[v0:v2] DescribeConfigs:DescribeConfigs[v0:v4] AlterConfigs:AlterConfigs[v0:v2] AlterReplicaLogDirs:AlterReplicaLogDirs[v0:v2] DescribeLogDirs:DescribeLogDirs[v0:v2] SaslAuthenticate:SaslAuthenticate[v0:v2] CreatePartitions:CreatePartitions[v0:v3] CreateDelegationToken:CreateDelegationToken[v0:v2] RenewDelegationToken:RenewDelegationToken[v0:v2] ExpireDelegationToken:ExpireDelegationToken[v0:v2] DescribeDelegationToken:DescribeDelegationToken[v0:v2] DeleteGroups:DeleteGroups[v0:v2] ElectLeaders:ElectLeaders[v0:v2] IncrementalAlfterConfigs:IncrementalAlfterConfigs[v0:v1] AlterPartitionReassignments:AlterPartitionReassignments[v0:v0] ListPartitionReassignments:ListPartitionReassignments[v0:v0] OffsetDelete:OffsetDelete[v0:v0] 48:48[v0:v1] 49:49[v0:v1] 50:50[v0:v0] 51:51[v0:v0] 56:56[v0:v0] 57:57[v0:v0] 60:60[v0:v0] 61:61[v0:v0]]

Metadata request: (clientId: "1", topic: "t1")
[0 0 0 19 0 3 0 1 0 0 0 4 0 1 49 0 0 0 1 0 2 116 49]

As far as I understand, these look ok to me...

@rhansen2
Copy link
Collaborator

@mtkopone Are you still seeing this issue if you use the tip of main? With #947 merged I would expect the Reader and consumer group to be sending the highest supported metadata request version.

@mtkopone
Copy link
Contributor Author

Yes. The problem remained in main.

The metadata call only used v1. The PR above makes it also support v6, which fixes at least that problem.

@ZhangDahe
Copy link

I face the same err.
github.com/segmentio/kafka-go v0.4.35
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20221021184657-750193894a7e

@mtkopone
Copy link
Contributor Author

@ZhangDahe would you be able to test if it works using the fork of #1013 ?

@rhansen2
Copy link
Collaborator

Your merged PR has been included in release https://github.com/segmentio/kafka-go/releases/tag/v0.4.36 Feel free to close this issue if things are working as expected in that version!

@mtkopone
Copy link
Contributor Author

mtkopone commented Nov 2, 2022

Thank you!

We are working on verifying that nothing else is broken with MSK Serverless. Looks good so far. I will update/close this issue once we have more info.

@rhansen2
Copy link
Collaborator

Closing this with a note that v0.4.36 and v0.4.37 should be skipped as they contain severe bugs. v0.4.38 contains this fix but with the bugs reverted. Please file a new issue if you're continuing to have trouble. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants