Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writer error "unexpected EOF" with v0.4.2 + SASL + TLS #499

Closed
pierrre opened this issue Sep 1, 2020 · 17 comments
Closed

Writer error "unexpected EOF" with v0.4.2 + SASL + TLS #499

pierrre opened this issue Sep 1, 2020 · 17 comments
Assignees
Labels

Comments

@pierrre
Copy link

pierrre commented Sep 1, 2020

Describe the bug
When I try to write a message, I receive "unexpected EOF" error message

Kafka Version
I'm using the version available from confluent.cloud, so I think it's the latest.

To Reproduce

package main

import (
	"context"
	"crypto/tls"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
	ctx := context.Background()
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{
			"xxx.gcp.confluent.cloud:9092",
		},
		Topic: "test",
		Dialer: &kafka.Dialer{
			Timeout:   10 * time.Second,
			DualStack: true,
			TLS:       &tls.Config{},
			SASLMechanism: plain.Mechanism{
				Username: "yyy",
				Password: "zzz",
			},
		},
		BatchSize: 1,
		Logger: kafka.LoggerFunc(func(format string, args ...interface{}) {
			log.Printf("Kafka writer: "+format, args...)
		}),
	})
	for {
		err := w.WriteMessages(ctx, kafka.Message{
			Value: []byte(time.Now().Format(time.RFC3339)),
		})
		if err != nil {
			panic(err)
		}
		log.Println("message sent")
		time.Sleep(1 * time.Second)
	}
}

The call to w.WriteMessages returns an error.

Expected behavior
I shouldn't not receive any error

Additional context
I'm using v0.4.2 with TLS + SASL auth.

It was working fine with 0.3.8 and 0.4.0.
0.4.1 has a panic.

I think it's actually the same issue as #490
But I don't understand why it was closed, the issue is not fixed.
Someone said #490 (comment)
but it's not true, right ?

@pierrre pierrre added the bug label Sep 1, 2020
@pierrre
Copy link
Author

pierrre commented Sep 1, 2020

PS: while testing on a different Kafka cluster, I was able to access the error log on the broker side:

[2020-08-31 14:02:52,081] INFO [SocketServer brokerId=1] Failed authentication with /1.2.3.4 (Invalid SASL/PLAIN response: expected 3 tokens, got 4) (org.apache.kafka.common.network.Selector)

I'm not sure, but it could be related to this code:

return m, []byte(fmt.Sprintf("\x00%s\x00%s", m.Username, m.Password)), nil

However I don't know from where the 3rd \x00 is written 🤷

@tarnowsc
Copy link

tarnowsc commented Sep 4, 2020

The same issue occurs when using:

scram.Mechanism(scram.SHA256, user, password)

@swarupdonepudi
Copy link

I get the same exact error. Watching for updates. Also, I see that NewWriter is now deprecated? can someone please share an example of instantiating a writer in v0.4.2?

@pierrre
Copy link
Author

pierrre commented Sep 8, 2020

@swarupdonepudi thank you for noticing this !
But the issue is still the same.
Here is my new code:

package main

import (
	"context"
	"crypto/tls"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
	ctx := context.Background()
	w := &kafka.Writer{
		Addr:  kafka.TCP("xxx.gcp.confluent.cloud:9092"),
		Topic: "test",
		Transport: &kafka.Transport{
			TLS: &tls.Config{},
			SASL: plain.Mechanism{
				Username: "yyy",
				Password: "zzz",
			},
		},
		BatchSize: 1,
		Logger: kafka.LoggerFunc(func(format string, args ...interface{}) {
			log.Printf("Kafka writer: "+format, args...)
		}),
	}
	for {
		err := w.WriteMessages(ctx, kafka.Message{
			Value: []byte(time.Now().Format(time.RFC3339)),
		})
		if err != nil {
			panic(err)
		}
		log.Println("message sent")
		time.Sleep(1 * time.Second)
	}
}

@vtolstov
Copy link

i think that this bug in 0.4.2 release that is fixed in master. I'm also have test service that uses 0.3.8 with sasl auth and it works fine, but in 0.4.2 it fails to publish message. Try to use master branch of kafka-go.

i think that we need to have next release like 0.4.3 to have this issue fixed

@pierrre
Copy link
Author

pierrre commented Sep 15, 2020

@vtolstov correct me if I'm wrong, but I think the master branch is equal to the tag v0.3.9 currently.
So it doesn't have all the new changes from the 0.4 branch.

@vtolstov
Copy link

ah, if you need 0.4.x branch, yes, you need to wait new 0.4 release

@zzzhr1990
Copy link

Now the release is v0.4.5, this bug still not fixed.

@scottwhite
Copy link

@zzzhr1990 confirmed, tried 0.4.3, 0.4.4 and 0.4.5 all don't work.

@zzzhr1990
Copy link

@scottwhite However, 0.3.x works fine

@icrustandi
Copy link

I can confirm we're having the same issues here with a confluent broker

@zzzhr1990
Copy link

zzzhr1990 commented Oct 10, 2020

Any fix there?

@jedna
Copy link

jedna commented Oct 19, 2020

Works again for me with 0.4.6.

@pierrre
Copy link
Author

pierrre commented Oct 19, 2020

probably fixed by #541
I will test it, closing the issue for now

@pierrre pierrre closed this as completed Oct 19, 2020
@pierrre
Copy link
Author

pierrre commented Oct 19, 2020

I can confirm, it's fixed in v0.4.6

@achille-roussel
Copy link
Contributor

Thanks for verifying @pierrre, and thanks everyone for your reports!

@michael-joseph-payne
Copy link

michael-joseph-payne commented Feb 4, 2022

I ran into this behavior again while trying to hook up a new broker in Confluent Cloud. All attempts at sending a message with a Writer resulted in an unexpected EOF error. Eventually, I isolated it down to a struct field in Transport. We are using v0.4.27 kafka-go and connect according to the Confluent documentation (SASL Plain / API Key & Secret).

Per the example documentation, we started out using (and had the errors mentioned above):

sharedTransport := &kafka.Transport{
		SASL: mechanism,
	}

I went through the Confluent documentation and also took a look at the default config they supply. The clue that tipped me off was that they require TLS 1.2 minimum. Adding that to our Transport struct as below solved the issue for us:

sharedTransport := &kafka.Transport{
		SASL: mechanism,
		// If we don't include this we get unexpected EOF errors
		TLS: &tls.Config{
			MinVersion: tls.VersionTLS12,
		},
	}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

10 participants