Skip to content

Conversation

@chlunde
Copy link

@chlunde chlunde commented Oct 31, 2018

Add support for SASL authentication by allowing the user to set the
SASLClient field on the kafka.Dialer struct.

The user must provide its own implementation of kafka.SASLClient because
there is currently no SASL library for Go with support for all the
implementations Kafka supports, and this will allow kafka-go to support
more SASL mechanisms without changing the core library.

The tests have been updated to test PLAIN authentication against a live
server. The implementation has also been tested using SCRAM-SHA-256
and SCRAM-SHA-512, against 0.11.0.3 and 2.0.1.

This commit introduces four new calls agains kafka, which will only be
used if SASLClient is set:

  • ApiVersionsRequest v1
  • SaslHandshakeRequest v0 and v1
  • SaslAuthenticateRequestV0
  • Raw SASL packets

For more information about the authentication sequence, please see
https://kafka.apache.org/protocol#sasl_handshake

TODO: For Kerberos and SCRAM-SHA-256-PLUS support the interface methods
for kafka.SASLClient might need to be extended.

Example using github.com/xdg/scram to implement SCRAM-SHA-512:

import (
        "context"
        "crypto/sha512"
        "hash"
        "log"

        kafka "github.com/segmentio/kafka-go"
        "github.com/xdg/scram"
)

var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type SCRAMClient struct {
        client *scram.ClientConversation
}

func (s *SCRAMClient) Mechanism() string { return "SCRAM-SHA-512" }

func (s *SCRAMClient) Start(ctx context.Context) ([]byte, error) {
        str, err := s.client.Step("")
        return []byte(str), err
}

func (s *SCRAMClient) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
        str, err := s.client.Step(string(challenge))
        return s.client.Done(), []byte(str), err
}

func main() {
        scramClient, err := SHA512.NewClient("adminscram", "admin-secret", "")
        if err != nil {
                log.Fatal(err)
        }

        r := kafka.NewReader(kafka.ReaderConfig{
                Dialer: &kafka.Dialer{
                        SASLClient: func() kafka.SASLClient { return &SCRAMClient{scramClient.NewConversation()} },
                },
                Brokers: []string{"localhost:9094"},
                Topic:   "test-writer-1",
        })

Updates #109

error.go Outdated
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add details too?

conn.go Outdated
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider return some an error if error message is set too? Should we add the message to kafka.Error (currently just an int)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the type of kafka.Error it's likely that a lot of code is going to break. The best I can think of is introducing an error type which exposes both the error value and the message and that program can test against.

Copy link

@frekui frekui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if there is anything I can help out with to move this forward. I have added some comments to the PR.

Before I knew this PR existed I started to add support for SASL myself. I used github.com/xdg/scram for the SCRAM code, but it's probably a good idea to make kafka-go independent of any SASL implementation as done in this PR.

@chlunde
Copy link
Author

chlunde commented Nov 7, 2018

@frekui thanks, I will clean up the code and ping the maintainers. I assume they want integration tests too, so I will have to look at that too.

@frekui
Copy link

frekui commented Nov 8, 2018

Regarding integration tests, currently Kafka 0.11.0.1 is used in the tests. That version doesn't have support for SaslHandshakeRequest v1, only v0 is supported. When v0 is used for the handshake the serialization of SaslAuthenticate must be changed. With v1 the usual Kafka protocol message headers are used, but with v0 a simpler encapsulation is used see http://kafka.apache.org/protocol.html#sasl_handshake for details (however the documentation fails to mention that each message is prefixed with its length encoded as a big-end 4-byte integer, see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L245).

Add support for SASL authentication by allowing the user to set the
SASLClient field on the kafka.Dialer struct.

The user must provide its own implementation of kafka.SASLClient because
there is currently no SASL library for Go with support for all the
implementations Kafka supports, and this will allow kafka-go to support
more SASL mechanisms without changing the core library.

The tests have been updated to test PLAIN authentication against a live
server.  The implementation has also been tested using SCRAM-SHA-256
and SCRAM-SHA-512, against 0.11.0.3 and 2.0.1.

This commit introduces four new calls agains kafka, which will only be
used if SASLClient is set:

 - ApiVersionsRequest v1
 - SaslHandshakeRequest v0 and v1
 - SaslAuthenticateRequestV0
 - Raw SASL packets

For more information about the authentication sequence, please see
https://kafka.apache.org/protocol#sasl_handshake

TODO: For Kerberos and SCRAM-SHA-256-PLUS support the interface methods
for kafka.SASLClient might need to be extended.

Example using github.com/xdg/scram to implement SCRAM-SHA-512:

    import (
            "context"
            "crypto/sha512"
            "hash"
            "log"

            kafka "github.com/segmentio/kafka-go"
            "github.com/xdg/scram"
    )

    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

    type SCRAMClient struct {
            client *scram.ClientConversation
    }

    func (s *SCRAMClient) Mechanism() string { return "SCRAM-SHA-512" }

    func (s *SCRAMClient) Start(ctx context.Context) ([]byte, error) {
            str, err := s.client.Step("")
            return []byte(str), err
    }

    func (s *SCRAMClient) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
            str, err := s.client.Step(string(challenge))
            return s.client.Done(), []byte(str), err
    }

    func main() {
            scramClient, err := SHA512.NewClient("adminscram", "admin-secret", "")
            if err != nil {
                    log.Fatal(err)
            }

            r := kafka.NewReader(kafka.ReaderConfig{
                    Dialer: &kafka.Dialer{
                            SASLClient: func() kafka.SASLClient { return &SCRAMClient{scramClient.NewConversation()} },
                    },
                    Brokers: []string{"localhost:9094"},
                    Topic:   "test-writer-1",
            })
@chlunde
Copy link
Author

chlunde commented Nov 14, 2018

I've updated the PR with integration tests and support for Kafka 0.11. Could someone take a look? @achille-roussel

If we want

  • Negotiation
  • Kerberos support
  • SCRAM...-PLUS-support

The interface might need changes.

@chlunde chlunde changed the title Add SASL support (WIP) Add SASL support Nov 14, 2018
Copy link

@frekui frekui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this looks good to me. I have tested it in my environment and it works. Good job @chlunde!

@chlunde
Copy link
Author

chlunde commented Nov 21, 2018

Hi @abraithwaite, I see you created the original ticket. Is there anything I can do to move this forward?

conn.go Outdated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the type of kafka.Error it's likely that a lot of code is going to break. The best I can think of is introducing an error type which exposes both the error value and the message and that program can test against.


// The SASLClient interface is used to enable plugging in different
// SASL implementations at compile time.
type SASLClient interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using the same interface than go-sasl here? https://godoc.org/github.com/emersion/go-sasl#Client

It could help to promote interoperability instead of requiring programs to provide a shim to bridge the two interfaces.

@achille-roussel
Copy link
Contributor

@chlunde what changes would need to be made to support the other features you mentioned?

@sirajmansour
Copy link

sirajmansour commented Nov 28, 2018

I've tested this with consumer groups, it seems to be hanging/getting stuck at :

rebalancing consumer group, {my-group

CODE :

package main

import (
	"context"
	"crypto/sha256"
	"hash"
	"log"
	"os"

	kafka "github.com/segmentio/kafka-go"
	"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }

type SCRAMClient struct {
	client *scram.ClientConversation
}

func (s *SCRAMClient) Mechanism() string { return "SCRAM-SHA-256" }

func (s *SCRAMClient) Start(ctx context.Context) ([]byte, error) {
	str, err := s.client.Step("")
	return []byte(str), err
}

func (s *SCRAMClient) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
	str, err := s.client.Step(string(challenge))
	return s.client.Done(), []byte(str), err
}

func main() {
	scramClient, err := SHA256.NewClient("user", "pass", "")
	if err != nil {
		log.Fatal(err)
	}

	r := kafka.NewReader(kafka.ReaderConfig{
		Dialer: &kafka.Dialer{
			SASLClient: func() kafka.SASLClient { return &SCRAMClient{scramClient.NewConversation()} },
			ClientID:   "my-client",
		},
		Brokers:     []string{"mybroker:9092"},
		Topic:       "test",
		GroupID:     "my-group",
		Logger:      log.New(os.Stdout, "", log.LstdFlags),
		ErrorLogger: log.New(os.Stderr, "", log.LstdFlags),
		MinBytes:    1,
		MaxBytes:    10e6,
	})

	log.Println("Created Reader. Starting loop")

	for {
		m, err := r.FetchMessage(context.Background())
		log.Println("after fetch")
		if err == nil {
			log.Println(string(m.Value))
		} else {
			log.Printf("error: %v", err)
			log.Println()
		}
	}
}

@achille-roussel
Copy link
Contributor

@sirajmansour you may have to rebase this branch on master to make sure you're not running in the bug we fixed in #143

If you want to provide a dump of your stack trace that could help identify the issue as well.

@sirajmansour
Copy link

@achille-roussel i can confirm this works after rebasing on master 👍

@efpe
Copy link

efpe commented Dec 19, 2018

Any news on this? It would be really nice to support SASL in Go finally :)

@achille-roussel
Copy link
Contributor

@chlunde do you think you'll be able to take this to completion?

Otherwise I think we can hand this change to another contributor to get it finished.

@Peltoche
Copy link

From what I understand, the PR works well and only need changes if we want to add the kerberos and negotiation features. This can probably be done in another PR. I know for sure that several companies (including mine) really need this SASL support a soon as possible.

If I can make anything in order to help, please tell me.

@chlunde
Copy link
Author

chlunde commented Jan 14, 2019

@achille-roussel Sorry, I don't know when I will have time to look at this again, at least not this week, so if someone has time it would be awesome if they could help.

I think the interface in https://godoc.org/mellium.im/sasl#Option looks most extensible, it can support SASL-SCRAM-..-PLUS, and probably kerberos in the future, without a breaking API change. Negotiation support might require a breaking change in the main library, if I remember correctly it would require a "probe" connection to find the supported mechanisms, or reconnecting during the normal connection flow. @SamWhited is mellium.im/sasl stable?

@SamWhited
Copy link

SamWhited commented Jan 14, 2019

I think the interface in https://godoc.org/mellium.im/sasl#Option looks most extensible, it can support SASL-SCRAM-..-PLUS, and probably kerberos in the future without a breaking API change.

SCRAM-*-PLUS is implemented in the main package of course, and I believe there was a fork that added Kerberos support floating around somewhere, but it would also be easily possible to create a new package with a Kerberos mechanism, it really doesn't need to be in the main library. Extensibility is actually a bit of a problem right now, but that's something I need to rethink and see if it can be made better (see below).

Negotiation support might require a breaking change in the main library, if I remember correctly it would require a "probe" connection to find the supported mechanisms, or reconnecting during the normal connection flow.

I'm not sure how Kafka handles auth, but I don't see why this would be the case. Basically everything has to probe for the supported mechanisms first, you just do that before creating the state machine used for auth; it doesn't need to be part of the actual SASL library. For example, I use this library with XMPP and all the protocol specific stuff like probing for mechanisms happens out of band.

Feel free to ping me to discuss though if I'm missing something and the current API doesn't meet your needs.

@SamWhited is mellium.im/sasl stable?

TL;DR — Soon maybe, but probably not right this moment, sorry.

Now that Go Modules is a thing I'm more inclined to tag a stable release (since there is a clear path to upgrading if something happens that breaks the API). There are a few design decisions I want to rethink first though (suggestions welcome) such as whether the API should be based on []byte or io.Reader (in case a user wants to implement allocation free negotiation), and how best to handle credentials since different mechanisms can require such widely divergent options and the way I've done it right now may lock me into having to add different properties for every single mechanism invented in the future that uses something more than usernames and passwords.

If you're interested in discussing any of this or how we can make the package meet your needs better, please ping me via email or the Jabber network; my address is sam@samwhited.com in both cases.

@stevevls
Copy link
Contributor

stevevls commented Feb 8, 2019

Thanks for the great work and discussion so far! I'm really excited to be helping to shepherd this PR into kafka-go. First off, a couple of exciting things have happened since this PR was initially opened:

  1. We're now running unit tests for four versions of Kafka (Test against multiple Kafka versions #186, Run tests for kafka 0.10 #187)
  2. We merged in support for ApiVersions, and versions are cached on the Conn (Add ApiVersions request #160)

I think that the best course of action from here would be to merge master into your branch to pull in those updates and resolve conflicts. Then I think we should scope this PR to the PLAIN and SCRAM authentication mechanisms. We can then open issues to add support for the following:

  1. Add support for GSSAPI (Kafka 0.10.0+)
  2. Add support for OAUTHBEARER (Kafka 2.0.0+)
  3. Mechanism Negotation (though I'm not sure this feature would add a ton of value)

Knowing that we have future work in mind, it would be great to do some up-front work to prepare for adding more mechanisms. I think that means a) getting the configuration, packaging, and interface correct, and b) setting up good, re-usable testing.

I've broken down feedback/asks into groups below:

Packaging

One goal that we have with kafka-go is to minimize the number of external dependencies. If you take the compression code as an example, you can see that a client of the kafka-go only needs to import compression codecs (and their associated dependencies) that they actually use. Having done a survey of the Go SASL libraries, I don't think there's going to be a single one that's going to work for all mechanisms present and future. We'll probably end up using different libs for different mechanisms, which underscores the importance of packaging as a means to avoid bloat.

Interface

I read the SASL RFC, and did a survey of the Go SASL libraries, and the interface I liked best was the one @achille-roussel linked: https://godoc.org/github.com/emersion/go-sasl#Client. I propose we adopt that one. However, I prefer the interface name to be sasl.Mechanism such that we match the terminology used in the RFC. I find the name Client to be kind of a misnomer b/c it's not actually a client of anything.

With SASL mechanisms in their own packages, you can export constructors , e.g. scram.SHA256(user, pass string) and scram.SHA512(user, pass string). The reason I would opt for constructors instead of exported variables is that I don't think credentials should be part of the API. Some mechanisms will use user/pass combos, but others will use tokens. As such, I think an attempt to generalize the credentials would be misguided. Happy to hear other thoughts on that, though.

Config

I would like to see configuration options on the Reader and Writer to take a sasl.Mechanism so that this feature is available to higher-level abstractions in addition to the Conn. Doing so will make it much easier for clients of the library to pick this feature up and benefit from your work.

SCRAM

AFAICT, Kafka only supports SCRAM-SHA-256 and SCRAM-SHA-512. It does not support any of the SCRAM-*-PLUS mechanisms, so no need to worry about channel binding. It looks as though SCRAM was added in 0.10.2.0, so we may want to consider adding another broker version to run tests against against the 0.10.X line. Have a look at KafkaIsAtLeast in version_test.go as a means to skip unit tests for unsupported broker versions.

Testing

I'd like to see two test functions: one that takes a sasl.Mechanism and expects a positive authentication result from the broker and a second that takes a sasl.Mechanism and expects a negative result. You can then use that function to validate the three mechanisms in this PR (PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512), and future work to add new mechanisms will also be able to leverage it.


I know this is a lot of feedback, but fortunately the code is already most of the way there. I think that if we do this little bit more work now that it's going to pay off big time by enabling us to add more SASL mechanisms later without bloating kafka-go or having to make backward incompatible changes. Thanks again for your work so far, and let me know if I can help in any way!

@stevevls
Copy link
Contributor

@chlunde I wanted to check in to see if you were still working on this PR. If not, would you mind if I picked it up and finished it off? Thanks!

@stevevls
Copy link
Contributor

stevevls commented Mar 1, 2019

Thanks for getting started here! We picked it up and incorporated it into #223.

@stevevls stevevls closed this Mar 1, 2019
@chlunde
Copy link
Author

chlunde commented Mar 2, 2019

@stevevls sorry, I missed your message, and as you assumed I don't have any extra time now. Thanks a lot for picking this up again!

waxzce pushed a commit to CleverCloud/stream-dns that referenced this pull request Apr 8, 2019
The kafka-go reader doesn't support SASL for know: segmentio/kafka-go#134
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants