From 588bbf31756087b91d93369c114e4398befe261e Mon Sep 17 00:00:00 2001 From: Carl Henrik Lunde Date: Wed, 14 Nov 2018 21:52:23 +0100 Subject: [PATCH 1/5] WIP: Add SASL support (#109) Initial contribution that provides a skeleton for SASL support. For more information about the authentication sequence, please see https://kafka.apache.org/protocol#sasl_handshake --- apiversions.go | 88 ++++++++++++++++++++++++++++ apiversions_test.go | 41 +++++++++++++ conn.go | 121 +++++++++++++++++++++++++++++++++++++++ dialer.go | 83 ++++++++++++++++++++++++++- dialer_test.go | 35 +++++++++++ docker-compose.yml | 17 ++++-- error.go | 84 --------------------------- protocol.go | 2 + saslauthenticate.go | 54 +++++++++++++++++ saslauthenticate_test.go | 62 ++++++++++++++++++++ saslhandshake.go | 53 +++++++++++++++++ saslhandshake_test.go | 61 ++++++++++++++++++++ 12 files changed, 610 insertions(+), 91 deletions(-) create mode 100644 apiversions.go create mode 100644 apiversions_test.go create mode 100644 saslauthenticate.go create mode 100644 saslauthenticate_test.go create mode 100644 saslhandshake.go create mode 100644 saslhandshake_test.go diff --git a/apiversions.go b/apiversions.go new file mode 100644 index 000000000..30866a1b5 --- /dev/null +++ b/apiversions.go @@ -0,0 +1,88 @@ +package kafka + +import ( + "bufio" +) + +type apiVersionsRequestV1 struct { +} + +func (t apiVersionsRequestV1) size() int32 { + return 0 +} + +func (t apiVersionsRequestV1) writeTo(w *bufio.Writer) {} + +type apiVersionsResponseV1Range struct { + APIKey int16 + + MinVersion int16 + MaxVersion int16 +} + +func (t *apiVersionsResponseV1Range) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.APIKey); err != nil { + return + } + if remain, err = readInt16(r, remain, &t.MinVersion); err != nil { + return + } + if remain, err = readInt16(r, remain, &t.MaxVersion); err != nil { + return + } + return +} + +func (t apiVersionsResponseV1Range) writeTo(w *bufio.Writer) { + writeInt16(w, t.APIKey) + writeInt16(w, t.MinVersion) + writeInt16(w, t.MaxVersion) +} + +func (t *apiVersionsResponseV1Range) size() int32 { return 6 } + +type apiVersionsResponseV1 struct { + // ErrorCode holds response error code + ErrorCode int16 + + // Responses holds api versions per api key + APIVersions []apiVersionsResponseV1Range + + // ThrottleTimeMS holds the duration in milliseconds for which the request + // was throttled due to quota violation (Zero if the request did not violate + // any quota) + ThrottleTimeMS int32 +} + +func (t apiVersionsResponseV1) size() int32 { + return sizeofInt16(t.ErrorCode) + + sizeofArray(len(t.APIVersions), func(i int) int32 { return t.APIVersions[i].size() }) + + sizeofInt32(t.ThrottleTimeMS) +} + +func (t apiVersionsResponseV1) writeTo(w *bufio.Writer) { + writeInt16(w, t.ErrorCode) + writeArray(w, len(t.APIVersions), func(i int) { t.APIVersions[i].writeTo(w) }) + writeInt32(w, t.ThrottleTimeMS) +} + +func (t *apiVersionsResponseV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { + return + } + fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { + item := apiVersionsResponseV1Range{} + if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { + return + } + t.APIVersions = append(t.APIVersions, item) + return + } + if remain, err = readArrayWith(r, remain, fn); err != nil { + return + } + if remain, err = readInt32(r, remain, &t.ThrottleTimeMS); err != nil { + return + } + return +} diff --git a/apiversions_test.go b/apiversions_test.go new file mode 100644 index 000000000..8c5f8afcb --- /dev/null +++ b/apiversions_test.go @@ -0,0 +1,41 @@ +package kafka + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +func TestAPIVersionsResponseV1(t *testing.T) { + item := apiVersionsResponseV1{ + ErrorCode: 2, + APIVersions: []apiVersionsResponseV1Range{ + { + APIKey: 1, + MinVersion: 1, + MaxVersion: 3, + }, + }, + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found apiVersionsResponseV1 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} diff --git a/conn.go b/conn.go index bcfb14c3e..2beab6ae0 100644 --- a/conn.go +++ b/conn.go @@ -983,6 +983,12 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID return c.wbuf.Flush() } +func (c *Conn) writeRaw(data []byte) error { + writeInt32(&c.wbuf, int32(len(data))) + c.wbuf.Write(data) + return c.wbuf.Flush() +} + func (c *Conn) readResponse(size int, res interface{}) error { size, err := read(&c.rbuf, size, res) switch err.(type) { @@ -1242,3 +1248,118 @@ func (d *connDeadline) unsetConnWriteDeadline() { d.wconn = nil d.mutex.Unlock() } + +// saslHandshake sends the SASL handshake message +// +// See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake +func (c *Conn) saslHandshake(version apiVersion, mechanism string) ([]string, error) { + // The wire format for V0 and V1 is identical, but the version + // number will affect how the SASL authentication + // challenge/reponses are sent + var resp saslHandshakeResponseV0 + + err := c.writeOperation( + func(deadline time.Time, id int32) error { + return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) + }, + func(deadline time.Time, size int) error { + return expectZeroSize(func() (remain int, err error) { + return (&resp).readFrom(&c.rbuf, size) + }()) + }, + ) + if err != nil { + return nil, err + } + if resp.ErrorCode != 0 { + return resp.EnabledMechanisms, Error(resp.ErrorCode) + } + + return resp.EnabledMechanisms, nil +} + +// saslAuthenticate sends the SASL authenticate message +// +// See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate +func (c *Conn) saslAuthenticate(opaque bool, data []byte) ([]byte, error) { + if opaque { + c.writeRaw(data) + var respLen int32 + _, err := readInt32(&c.rbuf, 4, &respLen) + if err != nil { + return nil, err + } + + resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen)) + if err != nil { + return nil, err + } + + return resp, nil + } else { + var request = saslAuthenticateRequestV0{Data: data} + var response saslAuthenticateResponseV0 + + err := c.writeOperation( + func(deadline time.Time, id int32) error { + return c.writeRequest(saslAuthenticateRequest, v0, id, request) + }, + func(deadline time.Time, size int) error { + return expectZeroSize(func() (remain int, err error) { + return (&response).readFrom(&c.rbuf, size) + }()) + }, + ) + + if err != nil { + return nil, err + } + if response.ErrorMessage != "" { + return nil, fmt.Errorf("SASL error message: %v", response.ErrorMessage) + } + if response.ErrorCode != 0 { + return nil, Error(response.ErrorCode) + } + + return response.Data, nil + } +} + +type versionRange struct { + minVersion apiVersion + maxVersion apiVersion +} + +// apiVersions gets the supported version ranges from the broker +// +// See http://kafka.apache.org/protocol.html#The_Messages_ApiVersions +func (c *Conn) apiVersions() (map[apiKey]versionRange, error) { + var request apiVersionsRequestV1 + var response apiVersionsResponseV1 + + err := c.writeOperation( + func(deadline time.Time, id int32) error { + return c.writeRequest(apiVersionsRequest, v1, id, request) + }, + func(deadline time.Time, size int) error { + return expectZeroSize(func() (remain int, err error) { + return (&response).readFrom(&c.rbuf, size) + }()) + }, + ) + + if err != nil { + return nil, err + } + + if response.ErrorCode != 0 { + return nil, Error(response.ErrorCode) + } + + ranges := make(map[apiKey]versionRange) + for _, r := range response.APIVersions { + ranges[apiKey(r.APIKey)] = versionRange{minVersion: apiVersion(r.MinVersion), maxVersion: apiVersion(r.MaxVersion)} + } + + return ranges, nil +} diff --git a/dialer.go b/dialer.go index ba6e7db2e..52f8e3090 100644 --- a/dialer.go +++ b/dialer.go @@ -61,6 +61,9 @@ type Dialer struct { // TLS enables Dialer to open secure connections. If nil, standard net.Conn // will be used. TLS *tls.Config + + // SASLClient enables SASL authentication + SASLClient func() SASLClient } // Dial connects to the address on the named network. @@ -98,7 +101,16 @@ func (d *Dialer) DialContext(ctx context.Context, network string, address string if err != nil { return nil, err } - return NewConnWith(c, ConnConfig{ClientID: d.ClientID}), nil + + conn := NewConnWith(c, ConnConfig{ClientID: d.ClientID}) + + if d.SASLClient != nil { + if err := d.authenticateSASL(ctx, conn); err != nil { + return nil, err + } + } + + return conn, nil } // DialLeader opens a connection to the leader of the partition for a given @@ -126,11 +138,19 @@ func (d *Dialer) DialPartition(ctx context.Context, network string, address stri return nil, err } - return NewConnWith(c, ConnConfig{ + conn := NewConnWith(c, ConnConfig{ ClientID: d.ClientID, Topic: partition.Topic, Partition: partition.ID, - }), nil + }) + + if d.SASLClient != nil { + if err := d.authenticateSASL(ctx, conn); err != nil { + return nil, err + } + } + + return conn, nil } // LookupLeader searches for the kafka broker that is the leader of the @@ -242,6 +262,47 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf return } +func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error { + versions, err := conn.apiVersions() + if err != nil { + return err + } + + saslVersion := versions[saslHandshakeRequest] + var handshakeVersion = v0 + var opaque = true + if saslVersion.maxVersion >= v1 { + opaque = false + handshakeVersion = v1 + } + + client := d.SASLClient() + _, err = conn.saslHandshake(handshakeVersion, client.Mechanism()) + if err != nil { + return err // TODO: allow mechanism negotiation by returning the list of supported mechanisms + } + + bytes, err := client.Start(ctx) + if err != nil { + return err + } + + var completed bool + for !completed { + challenge, err := conn.saslAuthenticate(opaque, bytes) + if err != nil { + return err + } + + completed, bytes, err = client.Next(ctx, challenge) + if err != nil { + return err + } + } + + return nil +} + func (d *Dialer) dialContext(ctx context.Context, network string, address string) (net.Conn, error) { if r := d.Resolver; r != nil { host, port := splitHostPort(address) @@ -332,6 +393,22 @@ type Resolver interface { LookupHost(ctx context.Context, host string) (addrs []string, err error) } +// The SASLClient interface is used to enable plugging in different +// SASL implementations at compile time. +type SASLClient interface { + // Mechanism returns the name of the mechanism (eg. SCRAM-SHA-256) + Mechanism() string + + // Start returns the initial client response to send to the server + Start(ctx context.Context) (response []byte, err error) + + // Next computes the response to the server challenge and may be + // called multiple times until completed is set. Completed may be + // set either because the authentication exchange has failed or + // succeeded. If the authentication failed, err must be non-nil. + Next(ctx context.Context, challenge []byte) (completed bool, response []byte, err error) +} + func sleep(ctx context.Context, duration time.Duration) bool { if duration == 0 { select { diff --git a/dialer_test.go b/dialer_test.go index 000579212..14bcbbecf 100644 --- a/dialer_test.go +++ b/dialer_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "fmt" "io" "net" "reflect" @@ -21,6 +22,10 @@ func TestDialer(t *testing.T) { scenario: "looking up partitions returns the list of available partitions for a topic", function: testDialerLookupPartitions, }, + { + scenario: "log in using SASL PLAIN authentication", + function: testDialerSASLPlainAuthentication, + }, } for _, test := range tests { @@ -74,6 +79,36 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) { } } +// PLAINSASLClient is a an SASL PLAIN implementation for testing purposes +// It does not support saslprep +type PLAINSASLClient struct { + Username string + Password string +} + +func (s *PLAINSASLClient) Mechanism() string { return "PLAIN" } + +func (s *PLAINSASLClient) Start(ctx context.Context) ([]byte, error) { + return []byte(fmt.Sprintf("\x00%s\x00%s", s.Username, s.Password)), nil +} + +func (s *PLAINSASLClient) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { + return true, nil, nil +} + +func testDialerSASLPlainAuthentication(t *testing.T, ctx context.Context, d *Dialer) { + var saslDialer = *d + saslDialer.SASLClient = func() SASLClient { + return &PLAINSASLClient{Username: "adminplain", Password: "admin-secret"} + } + + _, err := saslDialer.LookupPartitions(context.Background(), "tcp", "localhost:9094", "non-existing-topic") + if err != nil { + t.Error(err) + return + } +} + func tlsConfig(t *testing.T) *tls.Config { const ( certPEM = `-----BEGIN CERTIFICATE----- diff --git a/docker-compose.yml b/docker-compose.yml index 6c59b8760..b713c9563 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,22 +1,31 @@ version: "3" services: kafka: - image: wurstmeister/kafka:0.11.0.1 + image: wurstmeister/kafka:2.11-0.11.0.3 restart: on-failure:3 links: - zookeeper ports: - "9092:9092" + - "9094:9094" environment: - KAFKA_VERSION: '0.11.0.1' + KAFKA_VERSION: '0.11.0.3' KAFKA_BROKER_ID: 1 KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' KAFKA_DELETE_TOPIC_ENABLE: 'true' - KAFKA_ADVERTISED_HOST_NAME: 'localhost' - KAFKA_ADVERTISED_PORT: '9092' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9094' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9094' KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_MESSAGE_MAX_BYTES: 200000000 + KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL' + KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512,PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name adminscram + zookeeper: image: wurstmeister/zookeeper diff --git a/error.go b/error.go index 8034308c8..3dfec5521 100644 --- a/error.go +++ b/error.go @@ -222,48 +222,6 @@ func (e Error) Title() string { return "Security Disabled" case BrokerAuthorizationFailed: return "Broker Authorization Failed" - case KafkaStorageError: - return "Kafka Storage Error" - case LogDirNotFound: - return "Log Dir Not Found" - case SASLAuthenticationFailed: - return "SASL Authentication Failed" - case UnknownProducerId: - return "Unknown Producer ID" - case ReassignmentInProgress: - return "Reassignment In Progress" - case DelegationTokenAuthDisabled: - return "Delegation Token Auth Disabled" - case DelegationTokenNotFound: - return "Delegation Token Not Found" - case DelegationTokenOwnerMismatch: - return "Delegation Token Owner Mismatch" - case DelegationTokenRequestNotAllowed: - return "Delegation Token Request Not Allowed" - case DelegationTokenAuthorizationFailed: - return "Delegation Token Authorization Failed" - case DelegationTokenExpired: - return "Delegation Token Expired" - case InvalidPrincipalType: - return "Invalid Principal Type" - case NonEmptyGroup: - return "Non Empty Group" - case GroupIdNotFound: - return "Group ID Not Found" - case FetchSessionIDNotFound: - return "Fetch Session ID Not Found" - case InvalidFetchSessionEpoch: - return "Invalid Fetch Session Epoch" - case ListenerNotFound: - return "Listener Not Found" - case TopicDeletionDisabled: - return "Topic Deletion Disabled" - case FencedLeaderEpoch: - return "Fenced Leader Epoch" - case UnknownLeaderEpoch: - return "Unknown Leader Epoch" - case UnsupportedCompressionType: - return "Unsupported Compression Type" } return "" } @@ -381,48 +339,6 @@ func (e Error) Description() string { return "the security features are disabled" case BrokerAuthorizationFailed: return "the broker authorization failed" - case KafkaStorageError: - return "disk error when trying to access log file on the disk" - case LogDirNotFound: - return "the user-specified log directory is not found in the broker config" - case SASLAuthenticationFailed: - return "SASL Authentication failed" - case UnknownProducerId: - return "the broker could not locate the producer metadata associated with the producer ID" - case ReassignmentInProgress: - return "a partition reassignment is in progress" - case DelegationTokenAuthDisabled: - return "delegation token feature is not enabled" - case DelegationTokenNotFound: - return "delegation token is not found on server" - case DelegationTokenOwnerMismatch: - return "specified principal is not valid owner/renewer" - case DelegationTokenRequestNotAllowed: - return "delegation token requests are not allowed on plaintext/1-way ssl channels and on delegation token authenticated channels" - case DelegationTokenAuthorizationFailed: - return "delegation token authorization failed" - case DelegationTokenExpired: - return "delegation token is expired" - case InvalidPrincipalType: - return "supplied principaltype is not supported" - case NonEmptyGroup: - return "the group is not empty" - case GroupIdNotFound: - return "the group ID does not exist" - case FetchSessionIDNotFound: - return "the fetch session ID was not found" - case InvalidFetchSessionEpoch: - return "the fetch session epoch is invalid" - case ListenerNotFound: - return "there is no listener on the leader broker that matches the listener on which metadata request was processed" - case TopicDeletionDisabled: - return "topic deletion is disabled" - case FencedLeaderEpoch: - return "the leader epoch in the request is older than the epoch on the broker" - case UnknownLeaderEpoch: - return "the leader epoch in the request is newer than the epoch on the broker" - case UnsupportedCompressionType: - return "the requesting client does not support the compression type of given partition" } return "" } diff --git a/protocol.go b/protocol.go index b588aca72..039b1cc71 100644 --- a/protocol.go +++ b/protocol.go @@ -22,9 +22,11 @@ const ( syncGroupRequest apiKey = 14 describeGroupsRequest apiKey = 15 listGroupsRequest apiKey = 16 + saslHandshakeRequest apiKey = 17 apiVersionsRequest apiKey = 18 createTopicsRequest apiKey = 19 deleteTopicsRequest apiKey = 20 + saslAuthenticateRequest apiKey = 36 ) type apiVersion int16 diff --git a/saslauthenticate.go b/saslauthenticate.go new file mode 100644 index 000000000..487c96edc --- /dev/null +++ b/saslauthenticate.go @@ -0,0 +1,54 @@ +package kafka + +import ( + "bufio" +) + +type saslAuthenticateRequestV0 struct { + // Data holds the SASL payload + Data []byte +} + +func (t saslAuthenticateRequestV0) size() int32 { + return sizeofBytes(t.Data) +} + +func (t *saslAuthenticateRequestV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + return readBytes(r, sz, &t.Data) +} + +func (t saslAuthenticateRequestV0) writeTo(w *bufio.Writer) { + writeBytes(w, t.Data) +} + +type saslAuthenticateResponseV0 struct { + // ErrorCode holds response error code + ErrorCode int16 + + ErrorMessage string + + Data []byte +} + +func (t saslAuthenticateResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) + sizeofString(t.ErrorMessage) + sizeofBytes(t.Data) +} + +func (t saslAuthenticateResponseV0) writeTo(w *bufio.Writer) { + writeInt16(w, t.ErrorCode) + writeString(w, t.ErrorMessage) + writeBytes(w, t.Data) +} + +func (t *saslAuthenticateResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { + return + } + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } + if remain, err = readBytes(r, remain, &t.Data); err != nil { + return + } + return +} diff --git a/saslauthenticate_test.go b/saslauthenticate_test.go new file mode 100644 index 000000000..b064fcaa1 --- /dev/null +++ b/saslauthenticate_test.go @@ -0,0 +1,62 @@ +package kafka + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +func TestSASLAuthenticateRequestV0(t *testing.T) { + item := saslAuthenticateRequestV0{ + Data: []byte("\x00user\x00pass"), + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslAuthenticateRequestV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} + +func TestSASLAuthenticateResponseV0(t *testing.T) { + item := saslAuthenticateResponseV0{ + ErrorCode: 2, + ErrorMessage: "Message", + Data: []byte("bytes"), + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslAuthenticateResponseV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} diff --git a/saslhandshake.go b/saslhandshake.go new file mode 100644 index 000000000..aa37e1122 --- /dev/null +++ b/saslhandshake.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "bufio" +) + +// saslHandshakeRequestV0 implements the format for V0 and V1 SASL +// requests (they are identical) +type saslHandshakeRequestV0 struct { + // Mechanism holds the SASL Mechanism chosen by the client. + Mechanism string +} + +func (t saslHandshakeRequestV0) size() int32 { + return sizeofString(t.Mechanism) +} + +func (t *saslHandshakeRequestV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + return readString(r, sz, &t.Mechanism) +} + +func (t saslHandshakeRequestV0) writeTo(w *bufio.Writer) { + writeString(w, t.Mechanism) +} + +// saslHandshakeResponseV0 implements the format for V0 and V1 SASL +// responses (they are identical) +type saslHandshakeResponseV0 struct { + // ErrorCode holds response error code + ErrorCode int16 + + // Array of mechanisms enabled in the server + EnabledMechanisms []string +} + +func (t saslHandshakeResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) + sizeofStringArray(t.EnabledMechanisms) +} + +func (t saslHandshakeResponseV0) writeTo(w *bufio.Writer) { + writeInt16(w, t.ErrorCode) + writeStringArray(w, t.EnabledMechanisms) +} + +func (t *saslHandshakeResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { + return + } + if remain, err = readStringArray(r, remain, &t.EnabledMechanisms); err != nil { + return + } + return +} diff --git a/saslhandshake_test.go b/saslhandshake_test.go new file mode 100644 index 000000000..0b8e150b3 --- /dev/null +++ b/saslhandshake_test.go @@ -0,0 +1,61 @@ +package kafka + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +func TestSASLHandshakeRequestV0(t *testing.T) { + item := saslHandshakeRequestV0{ + Mechanism: "SCRAM-SHA-512", + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslHandshakeRequestV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} + +func TestSASLHandshakeResponseV0(t *testing.T) { + item := saslHandshakeResponseV0{ + ErrorCode: 2, + EnabledMechanisms: []string{"PLAIN", "SCRAM-SHA-512"}, + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslHandshakeResponseV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} From d5bc3a82a103e0dcec1bf2b5a1b9ceacac33bb29 Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Wed, 27 Feb 2019 10:16:10 -0800 Subject: [PATCH 2/5] Added support for SASL PLAIN and SCRAM mechanisms. Also ensured that adding support for new mechanisms in the future is as straightforward as possible. --- .circleci/config.yml | 37 ++++++++++++-- apiversions.go | 88 -------------------------------- apiversions_test.go | 41 --------------- conn.go | 118 +++++++++++++++---------------------------- conn_test.go | 17 +++++++ dialer.go | 105 +++++++++++++++----------------------- dialer_test.go | 35 ------------- docker-compose.yml | 20 ++++---- error.go | 84 ++++++++++++++++++++++++++++++ sasl.go | 31 ++++++++++++ sasl/plain/plain.go | 23 +++++++++ sasl/scram/scram.go | 83 ++++++++++++++++++++++++++++++ sasl_test.go | 99 ++++++++++++++++++++++++++++++++++++ 13 files changed, 461 insertions(+), 320 deletions(-) delete mode 100644 apiversions.go delete mode 100644 apiversions_test.go create mode 100644 sasl.go create mode 100644 sasl/plain/plain.go create mode 100644 sasl/scram/scram.go create mode 100644 sasl_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 93b895eaa..e7505597e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,7 +8,7 @@ jobs: - image: circleci/golang - image: wurstmeister/zookeeper ports: ['2181:2181'] - - image: wurstmeister/kafka:0.10.1.0 + - image: wurstmeister/kafka:0.10.1.1 ports: ['9092:9092'] environment: KAFKA_BROKER_ID: '1' @@ -18,6 +18,12 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } @@ -32,8 +38,8 @@ jobs: - image: circleci/golang - image: wurstmeister/zookeeper ports: ['2181:2181'] - - image: wurstmeister/kafka:0.11.0.1 - ports: ['9092:9092'] + - image: wurstmeister/kafka:2.11-0.11.0.3 + ports: ['9092:9092','9093:9093'] environment: KAFKA_BROKER_ID: '1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' @@ -42,6 +48,13 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } @@ -57,7 +70,7 @@ jobs: - image: wurstmeister/zookeeper ports: ['2181:2181'] - image: wurstmeister/kafka:2.11-1.1.1 - ports: ['9092:9092'] + ports: ['9092:9092','9093:9093'] environment: KAFKA_BROKER_ID: '1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' @@ -66,6 +79,13 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } @@ -81,7 +101,7 @@ jobs: - image: wurstmeister/zookeeper ports: ['2181:2181'] - image: wurstmeister/kafka:2.12-2.1.0 - ports: ['9092:9092'] + ports: ['9092:9092','9093:9093'] environment: KAFKA_BROKER_ID: '1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' @@ -90,6 +110,13 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } diff --git a/apiversions.go b/apiversions.go deleted file mode 100644 index 30866a1b5..000000000 --- a/apiversions.go +++ /dev/null @@ -1,88 +0,0 @@ -package kafka - -import ( - "bufio" -) - -type apiVersionsRequestV1 struct { -} - -func (t apiVersionsRequestV1) size() int32 { - return 0 -} - -func (t apiVersionsRequestV1) writeTo(w *bufio.Writer) {} - -type apiVersionsResponseV1Range struct { - APIKey int16 - - MinVersion int16 - MaxVersion int16 -} - -func (t *apiVersionsResponseV1Range) readFrom(r *bufio.Reader, sz int) (remain int, err error) { - if remain, err = readInt16(r, sz, &t.APIKey); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.MinVersion); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.MaxVersion); err != nil { - return - } - return -} - -func (t apiVersionsResponseV1Range) writeTo(w *bufio.Writer) { - writeInt16(w, t.APIKey) - writeInt16(w, t.MinVersion) - writeInt16(w, t.MaxVersion) -} - -func (t *apiVersionsResponseV1Range) size() int32 { return 6 } - -type apiVersionsResponseV1 struct { - // ErrorCode holds response error code - ErrorCode int16 - - // Responses holds api versions per api key - APIVersions []apiVersionsResponseV1Range - - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 -} - -func (t apiVersionsResponseV1) size() int32 { - return sizeofInt16(t.ErrorCode) + - sizeofArray(len(t.APIVersions), func(i int) int32 { return t.APIVersions[i].size() }) + - sizeofInt32(t.ThrottleTimeMS) -} - -func (t apiVersionsResponseV1) writeTo(w *bufio.Writer) { - writeInt16(w, t.ErrorCode) - writeArray(w, len(t.APIVersions), func(i int) { t.APIVersions[i].writeTo(w) }) - writeInt32(w, t.ThrottleTimeMS) -} - -func (t *apiVersionsResponseV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) { - if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { - return - } - fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { - item := apiVersionsResponseV1Range{} - if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { - return - } - t.APIVersions = append(t.APIVersions, item) - return - } - if remain, err = readArrayWith(r, remain, fn); err != nil { - return - } - if remain, err = readInt32(r, remain, &t.ThrottleTimeMS); err != nil { - return - } - return -} diff --git a/apiversions_test.go b/apiversions_test.go deleted file mode 100644 index 8c5f8afcb..000000000 --- a/apiversions_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package kafka - -import ( - "bufio" - "bytes" - "reflect" - "testing" -) - -func TestAPIVersionsResponseV1(t *testing.T) { - item := apiVersionsResponseV1{ - ErrorCode: 2, - APIVersions: []apiVersionsResponseV1Range{ - { - APIKey: 1, - MinVersion: 1, - MaxVersion: 3, - }, - }, - } - - buf := bytes.NewBuffer(nil) - w := bufio.NewWriter(buf) - item.writeTo(w) - w.Flush() - - var found apiVersionsResponseV1 - remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() - } -} diff --git a/conn.go b/conn.go index 2beab6ae0..00ad8689e 100644 --- a/conn.go +++ b/conn.go @@ -983,12 +983,6 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID return c.wbuf.Flush() } -func (c *Conn) writeRaw(data []byte) error { - writeInt32(&c.wbuf, int32(len(data))) - c.wbuf.Write(data) - return c.wbuf.Flush() -} - func (c *Conn) readResponse(size int, res interface{}) error { size, err := read(&c.rbuf, size, res) switch err.(type) { @@ -1249,54 +1243,52 @@ func (d *connDeadline) unsetConnWriteDeadline() { d.mutex.Unlock() } -// saslHandshake sends the SASL handshake message +// saslHandshake sends the SASL handshake message. This will determine whether +// the Mechanism is supported by the cluster. If it's not, this function will +// error out with UnsupportedSASLMechanism. +// +// If the mechanism is unsupported, the handshake request will reply with the +// list of the cluster's configured mechanisms, which could potentially be used +// to facilitate negotiation. At the moment, we are not negotiating the +// mechanism as we believe that brokers are usually known to the client, and +// therefore the client should already know which mechanisms are supported. // // See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake -func (c *Conn) saslHandshake(version apiVersion, mechanism string) ([]string, error) { +func (c *Conn) saslHandshake(mechanism string) error { // The wire format for V0 and V1 is identical, but the version // number will affect how the SASL authentication - // challenge/reponses are sent + // challenge/responses are sent var resp saslHandshakeResponseV0 + version := v0 + if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 { + version = v1 + } err := c.writeOperation( func(deadline time.Time, id int32) error { return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) }, func(deadline time.Time, size int) error { - return expectZeroSize(func() (remain int, err error) { + return expectZeroSize(func() (int, error) { return (&resp).readFrom(&c.rbuf, size) }()) }, ) - if err != nil { - return nil, err + if err == nil && resp.ErrorCode != 0 { + err = Error(resp.ErrorCode) } - if resp.ErrorCode != 0 { - return resp.EnabledMechanisms, Error(resp.ErrorCode) - } - - return resp.EnabledMechanisms, nil + return err } -// saslAuthenticate sends the SASL authenticate message +// saslAuthenticate sends the SASL authenticate message. This function must +// be immediately preceded by a successful saslHandshake. // // See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate -func (c *Conn) saslAuthenticate(opaque bool, data []byte) ([]byte, error) { - if opaque { - c.writeRaw(data) - var respLen int32 - _, err := readInt32(&c.rbuf, 4, &respLen) - if err != nil { - return nil, err - } - - resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen)) - if err != nil { - return nil, err - } - - return resp, nil - } else { +func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { + // if we sent a v1 handshake, then we must encapsulate the authentication + // request in a saslAuthenticateRequest. otherwise, we read and write raw + // bytes. + if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 { var request = saslAuthenticateRequestV0{Data: data} var response saslAuthenticateResponseV0 @@ -1310,56 +1302,28 @@ func (c *Conn) saslAuthenticate(opaque bool, data []byte) ([]byte, error) { }()) }, ) - - if err != nil { - return nil, err - } - if response.ErrorMessage != "" { - return nil, fmt.Errorf("SASL error message: %v", response.ErrorMessage) - } - if response.ErrorCode != 0 { - return nil, Error(response.ErrorCode) + if err == nil && response.ErrorCode != 0 { + err = Error(response.ErrorCode) } - - return response.Data, nil + return response.Data, err } -} - -type versionRange struct { - minVersion apiVersion - maxVersion apiVersion -} - -// apiVersions gets the supported version ranges from the broker -// -// See http://kafka.apache.org/protocol.html#The_Messages_ApiVersions -func (c *Conn) apiVersions() (map[apiKey]versionRange, error) { - var request apiVersionsRequestV1 - var response apiVersionsResponseV1 - err := c.writeOperation( - func(deadline time.Time, id int32) error { - return c.writeRequest(apiVersionsRequest, v1, id, request) - }, - func(deadline time.Time, size int) error { - return expectZeroSize(func() (remain int, err error) { - return (&response).readFrom(&c.rbuf, size) - }()) - }, - ) - - if err != nil { + // fall back to opaque bytes on the wire. the broker is expecting these if + // it just processed a v0 sasl handshake. + writeInt32(&c.wbuf, int32(len(data))) + if _, err := c.wbuf.Write(data); err != nil { return nil, err } - - if response.ErrorCode != 0 { - return nil, Error(response.ErrorCode) + if err := c.wbuf.Flush(); err != nil { + return nil, err } - ranges := make(map[apiKey]versionRange) - for _, r := range response.APIVersions { - ranges[apiKey(r.APIKey)] = versionRange{minVersion: apiVersion(r.MinVersion), maxVersion: apiVersion(r.MaxVersion)} + var respLen int32 + _, err := readInt32(&c.rbuf, 4, &respLen) + if err != nil { + return nil, err } - return ranges, nil + resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen)) + return resp, err } diff --git a/conn_test.go b/conn_test.go index 21df7b8bc..f7755ff18 100644 --- a/conn_test.go +++ b/conn_test.go @@ -980,6 +980,23 @@ func testBrokers(t *testing.T, conn *Conn) { } } +func TestUnsupportedSASLMechanism(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, err := (&Dialer{ + Resolver: &net.Resolver{}, + }).DialContext(ctx, "tcp", "127.0.0.1:9093") + if err != nil { + t.Fatal("failed to open a new kafka connection:", err) + } + defer conn.Close() + + if err := conn.saslHandshake("FOO"); err != UnsupportedSASLMechanism { + t.Errorf("Expected UnsupportedSASLMechanism but got %v", err) + } +} + const benchmarkMessageCount = 100 func BenchmarkConn(b *testing.B) { diff --git a/dialer.go b/dialer.go index 52f8e3090..31f1ae643 100644 --- a/dialer.go +++ b/dialer.go @@ -3,6 +3,7 @@ package kafka import ( "context" "crypto/tls" + "io" "net" "strconv" "strings" @@ -62,8 +63,9 @@ type Dialer struct { // will be used. TLS *tls.Config - // SASLClient enables SASL authentication - SASLClient func() SASLClient + // SASLMechanism configures the Dialer to use SASL authentication. If nil, + // no authentication will be performed. + SASLMechanism SASLMechanism } // Dial connects to the address on the named network. @@ -97,20 +99,7 @@ func (d *Dialer) DialContext(ctx context.Context, network string, address string defer cancel() } - c, err := d.dialContext(ctx, network, address) - if err != nil { - return nil, err - } - - conn := NewConnWith(c, ConnConfig{ClientID: d.ClientID}) - - if d.SASLClient != nil { - if err := d.authenticateSASL(ctx, conn); err != nil { - return nil, err - } - } - - return conn, nil + return d.connect(ctx, network, address, ConnConfig{ClientID: d.ClientID}) } // DialLeader opens a connection to the leader of the partition for a given @@ -133,24 +122,11 @@ func (d *Dialer) DialLeader(ctx context.Context, network string, address string, // descriptor. It's strongly advised to use descriptor of the partition that comes out of // functions LookupPartition or LookupPartitions. func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) { - c, err := d.dialContext(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port))) - if err != nil { - return nil, err - } - - conn := NewConnWith(c, ConnConfig{ + return d.connect(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)), ConnConfig{ ClientID: d.ClientID, Topic: partition.Topic, Partition: partition.ID, }) - - if d.SASLClient != nil { - if err := d.authenticateSASL(ctx, conn); err != nil { - return nil, err - } - } - - return conn, nil } // LookupLeader searches for the kafka broker that is the leader of the @@ -262,39 +238,58 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf return } -func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error { - versions, err := conn.apiVersions() +// connect opens a socket connection to the broker, wraps it to create a +// kafka connection, and performs SASL authentication if configured to do so. +func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) { + + c, err := d.dialContext(ctx, network, address) if err != nil { - return err + return nil, err } - saslVersion := versions[saslHandshakeRequest] - var handshakeVersion = v0 - var opaque = true - if saslVersion.maxVersion >= v1 { - opaque = false - handshakeVersion = v1 + conn := NewConnWith(c, connCfg) + + if d.SASLMechanism != nil { + if err := d.authenticateSASL(ctx, conn); err != nil { + _ = conn.Close() + return nil, err + } } - client := d.SASLClient() - _, err = conn.saslHandshake(handshakeVersion, client.Mechanism()) + return conn, nil +} + +// authenticateSASL performs all of the required requests to authenticate this +// connection. If any step fails, this function returns with an error. A nil +// error indicates successful authentication. +// +// In case of error, this function *does not* close the connection. That is the +// responsibility of the caller. +func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error { + mech, state, err := d.SASLMechanism.Start(ctx) if err != nil { - return err // TODO: allow mechanism negotiation by returning the list of supported mechanisms + return err } - - bytes, err := client.Start(ctx) + err = conn.saslHandshake(mech) if err != nil { return err } var completed bool for !completed { - challenge, err := conn.saslAuthenticate(opaque, bytes) - if err != nil { + challenge, err := conn.saslAuthenticate(state) + switch err { + case nil: + case io.EOF: + // the broker may communicate a failed exchange by closing the + // connection (esp. in the case where we're passing opaque sasl + // data over the wire since there's no protocol info). + return SASLAuthenticationFailed + default: return err } - completed, bytes, err = client.Next(ctx, challenge) + completed, state, err = d.SASLMechanism.Next(ctx, challenge) if err != nil { return err } @@ -393,22 +388,6 @@ type Resolver interface { LookupHost(ctx context.Context, host string) (addrs []string, err error) } -// The SASLClient interface is used to enable plugging in different -// SASL implementations at compile time. -type SASLClient interface { - // Mechanism returns the name of the mechanism (eg. SCRAM-SHA-256) - Mechanism() string - - // Start returns the initial client response to send to the server - Start(ctx context.Context) (response []byte, err error) - - // Next computes the response to the server challenge and may be - // called multiple times until completed is set. Completed may be - // set either because the authentication exchange has failed or - // succeeded. If the authentication failed, err must be non-nil. - Next(ctx context.Context, challenge []byte) (completed bool, response []byte, err error) -} - func sleep(ctx context.Context, duration time.Duration) bool { if duration == 0 { select { diff --git a/dialer_test.go b/dialer_test.go index 14bcbbecf..000579212 100644 --- a/dialer_test.go +++ b/dialer_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "crypto/x509" - "fmt" "io" "net" "reflect" @@ -22,10 +21,6 @@ func TestDialer(t *testing.T) { scenario: "looking up partitions returns the list of available partitions for a topic", function: testDialerLookupPartitions, }, - { - scenario: "log in using SASL PLAIN authentication", - function: testDialerSASLPlainAuthentication, - }, } for _, test := range tests { @@ -79,36 +74,6 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) { } } -// PLAINSASLClient is a an SASL PLAIN implementation for testing purposes -// It does not support saslprep -type PLAINSASLClient struct { - Username string - Password string -} - -func (s *PLAINSASLClient) Mechanism() string { return "PLAIN" } - -func (s *PLAINSASLClient) Start(ctx context.Context) ([]byte, error) { - return []byte(fmt.Sprintf("\x00%s\x00%s", s.Username, s.Password)), nil -} - -func (s *PLAINSASLClient) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { - return true, nil, nil -} - -func testDialerSASLPlainAuthentication(t *testing.T, ctx context.Context, d *Dialer) { - var saslDialer = *d - saslDialer.SASLClient = func() SASLClient { - return &PLAINSASLClient{Username: "adminplain", Password: "admin-secret"} - } - - _, err := saslDialer.LookupPartitions(context.Background(), "tcp", "localhost:9094", "non-existing-topic") - if err != nil { - t.Error(err) - return - } -} - func tlsConfig(t *testing.T) *tls.Config { const ( certPEM = `-----BEGIN CERTIFICATE----- diff --git a/docker-compose.yml b/docker-compose.yml index b713c9563..cc393f433 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,26 +7,24 @@ services: - zookeeper ports: - "9092:9092" - - "9094:9094" + - "9093:9093" environment: - KAFKA_VERSION: '0.11.0.3' + KAFKA_VERSION: '0.11.0.1' KAFKA_BROKER_ID: 1 KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' KAFKA_DELETE_TOPIC_ENABLE: 'true' - KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9094' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9094' + KAFKA_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_MESSAGE_MAX_BYTES: 200000000 - KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL' - KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512,PLAIN + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" CUSTOM_INIT_SCRIPT: |- - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; - /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name adminscram - - + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram zookeeper: image: wurstmeister/zookeeper ports: diff --git a/error.go b/error.go index 3dfec5521..8034308c8 100644 --- a/error.go +++ b/error.go @@ -222,6 +222,48 @@ func (e Error) Title() string { return "Security Disabled" case BrokerAuthorizationFailed: return "Broker Authorization Failed" + case KafkaStorageError: + return "Kafka Storage Error" + case LogDirNotFound: + return "Log Dir Not Found" + case SASLAuthenticationFailed: + return "SASL Authentication Failed" + case UnknownProducerId: + return "Unknown Producer ID" + case ReassignmentInProgress: + return "Reassignment In Progress" + case DelegationTokenAuthDisabled: + return "Delegation Token Auth Disabled" + case DelegationTokenNotFound: + return "Delegation Token Not Found" + case DelegationTokenOwnerMismatch: + return "Delegation Token Owner Mismatch" + case DelegationTokenRequestNotAllowed: + return "Delegation Token Request Not Allowed" + case DelegationTokenAuthorizationFailed: + return "Delegation Token Authorization Failed" + case DelegationTokenExpired: + return "Delegation Token Expired" + case InvalidPrincipalType: + return "Invalid Principal Type" + case NonEmptyGroup: + return "Non Empty Group" + case GroupIdNotFound: + return "Group ID Not Found" + case FetchSessionIDNotFound: + return "Fetch Session ID Not Found" + case InvalidFetchSessionEpoch: + return "Invalid Fetch Session Epoch" + case ListenerNotFound: + return "Listener Not Found" + case TopicDeletionDisabled: + return "Topic Deletion Disabled" + case FencedLeaderEpoch: + return "Fenced Leader Epoch" + case UnknownLeaderEpoch: + return "Unknown Leader Epoch" + case UnsupportedCompressionType: + return "Unsupported Compression Type" } return "" } @@ -339,6 +381,48 @@ func (e Error) Description() string { return "the security features are disabled" case BrokerAuthorizationFailed: return "the broker authorization failed" + case KafkaStorageError: + return "disk error when trying to access log file on the disk" + case LogDirNotFound: + return "the user-specified log directory is not found in the broker config" + case SASLAuthenticationFailed: + return "SASL Authentication failed" + case UnknownProducerId: + return "the broker could not locate the producer metadata associated with the producer ID" + case ReassignmentInProgress: + return "a partition reassignment is in progress" + case DelegationTokenAuthDisabled: + return "delegation token feature is not enabled" + case DelegationTokenNotFound: + return "delegation token is not found on server" + case DelegationTokenOwnerMismatch: + return "specified principal is not valid owner/renewer" + case DelegationTokenRequestNotAllowed: + return "delegation token requests are not allowed on plaintext/1-way ssl channels and on delegation token authenticated channels" + case DelegationTokenAuthorizationFailed: + return "delegation token authorization failed" + case DelegationTokenExpired: + return "delegation token is expired" + case InvalidPrincipalType: + return "supplied principaltype is not supported" + case NonEmptyGroup: + return "the group is not empty" + case GroupIdNotFound: + return "the group ID does not exist" + case FetchSessionIDNotFound: + return "the fetch session ID was not found" + case InvalidFetchSessionEpoch: + return "the fetch session epoch is invalid" + case ListenerNotFound: + return "there is no listener on the leader broker that matches the listener on which metadata request was processed" + case TopicDeletionDisabled: + return "topic deletion is disabled" + case FencedLeaderEpoch: + return "the leader epoch in the request is older than the epoch on the broker" + case UnknownLeaderEpoch: + return "the leader epoch in the request is newer than the epoch on the broker" + case UnsupportedCompressionType: + return "the requesting client does not support the compression type of given partition" } return "" } diff --git a/sasl.go b/sasl.go new file mode 100644 index 000000000..cdbce0549 --- /dev/null +++ b/sasl.go @@ -0,0 +1,31 @@ +package kafka + +import "context" + +// SASLMechanism implements the SASL state machine. It is initialized by +// calling Start at which point the initial bytes should be sent to the server. +// The caller then loops by passing the server's response into Next and then +// sending Next's returned bytes to the server. Eventually either Next will +// indicate that the authentication has been successfully completed or an error +// will cause the state machine to exit prematurely. +// +// A SASLMechanism must be re-usable, but it does not need to be safe for +// concurrent access by multiple go routines. +type SASLMechanism interface { + // Start begins SASL authentication. It returns the authentication mechanism + // name and "initial response" data (if required by the selected mechanism). + // A non-nil error causes the client to abort the authentication attempt. + // + // A nil ir value is different from a zero-length value. The nil value + // indicates that the selected mechanism does not use an initial response, + // while a zero-length value indicates an empty initial response, which must + // be sent to the server. + // + // In order to ensure that the SASLMechanism is reusable, calling Start must + // reset any internal state. + Start(ctx context.Context) (mech string, ir []byte, err error) + + // Next continues challenge-response authentication. A non-nil error causes + // the client to abort the authentication attempt. + Next(ctx context.Context, challenge []byte) (done bool, response []byte, err error) +} diff --git a/sasl/plain/plain.go b/sasl/plain/plain.go new file mode 100644 index 000000000..15341d081 --- /dev/null +++ b/sasl/plain/plain.go @@ -0,0 +1,23 @@ +package plain + +import ( + "context" + "fmt" +) + +// Mechanism implements the PLAIN mechanism and passes the credentials in clear +// text. +type Mechanism struct { + Username string + Password string +} + +func (m Mechanism) Start(ctx context.Context) (string, []byte, error) { + return "PLAIN", []byte(fmt.Sprintf("\x00%s\x00%s", m.Username, m.Password)), nil +} + +func (m Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { + // kafka will return error if it rejected the credentials, so we'd only + // arrive here on success. + return true, nil, nil +} diff --git a/sasl/scram/scram.go b/sasl/scram/scram.go new file mode 100644 index 000000000..368de0b14 --- /dev/null +++ b/sasl/scram/scram.go @@ -0,0 +1,83 @@ +package scram + +import ( + "context" + "crypto/sha512" + "hash" + + "github.com/pkg/errors" + "github.com/xdg/scram" +) + +// HashFunction determines the hash function used by SCRAM to protect the user's +// credentials. +type HashFunction int + +const ( + _ HashFunction = iota + SHA256 + SHA512 +) + +func (a HashFunction) name() string { + switch a { + case SHA256: + return "SCRAM-SHA-256" + case SHA512: + return "SCRAM-SHA-512" + } + return "invalid" +} + +func (a HashFunction) hashGenerator() scram.HashGeneratorFcn { + switch a { + case SHA256: + return scram.SHA256 + case SHA512: + // for whatever reason, the scram package doesn't have a predefined + // constant for 512, but we can roll our own. + return scram.HashGeneratorFcn(func() hash.Hash { + return sha512.New() + }) + } + return nil +} + +type mechanism struct { + hash HashFunction + client *scram.Client + convo *scram.ClientConversation +} + +// Mechanism returns a new sasl.Mechanism that will use SCRAM with the provided +// hash function to securely transmit the provided credentials to Kafka. +// +// SCRAM-SHA-256 and SCRAM-SHA-512 were added to Kafka in 0.10.2.0. These +// mechanisms will not work with older versions. +func Mechanism(hash HashFunction, username, password string) (*mechanism, error) { + hashGen := hash.hashGenerator() + if hashGen == nil { + return nil, errors.New("invalid hash function") + } + + client, err := hashGen.NewClient(username, password, "") + if err != nil { + return nil, err + } + + return &mechanism{ + hash: hash, + client: client, + }, nil +} + +func (m *mechanism) Start(ctx context.Context) (string, []byte, error) { + m.convo = m.client.NewConversation() + str, err := m.convo.Step("") + return m.hash.name(), []byte(str), err +} + +func (m *mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { + str, err := m.convo.Step(string(challenge)) + return m.convo.Done(), []byte(str), err +} diff --git a/sasl_test.go b/sasl_test.go new file mode 100644 index 000000000..a8ffc4e67 --- /dev/null +++ b/sasl_test.go @@ -0,0 +1,99 @@ +package kafka + +import ( + "context" + "testing" + "time" + + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" +) + +const ( + saslTestConnect = "localhost:9093" // connect to sasl listener + saslTestTopic = "test-writer-0" // this topic is guaranteed to exist. +) + +func TestSASL(t *testing.T) { + + t.Parallel() + + tests := []struct { + valid func() SASLMechanism + invalid func() SASLMechanism + minKafka string + }{ + { + valid: func() SASLMechanism { + return plain.Mechanism{ + Username: "adminplain", + Password: "admin-secret", + } + }, + invalid: func() SASLMechanism { + return plain.Mechanism{ + Username: "adminplain", + Password: "badpassword", + } + }, + }, + { + valid: func() SASLMechanism { + mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256") + return mech + }, + invalid: func() SASLMechanism { + mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword") + return mech + }, + minKafka: "0.10.2.0", + }, + { + valid: func() SASLMechanism { + mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512") + return mech + }, + invalid: func() SASLMechanism { + mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword") + return mech + }, + minKafka: "0.10.2.0", + }, + } + + for _, tt := range tests { + name, _, _ := tt.valid().Start(context.Background()) + if !KafkaIsAtLeast(tt.minKafka) { + t.Skip("requires min kafka version " + tt.minKafka) + } + + t.Run(name+" success", func(t *testing.T) { + testConnect(t, tt.valid(), true) + }) + t.Run(name+" failure", func(t *testing.T) { + testConnect(t, tt.invalid(), false) + }) + t.Run(name+" is reusable", func(t *testing.T) { + mech := tt.valid() + testConnect(t, mech, true) + testConnect(t, mech, true) + testConnect(t, mech, true) + }) + + } +} + +func testConnect(t *testing.T, mechanism SASLMechanism, success bool) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + d := Dialer{ + SASLMechanism: mechanism, + } + _, err := d.DialLeader(ctx, "tcp", saslTestConnect, saslTestTopic, 0) + if success && err != nil { + t.Errorf("should have logged in correctly, got err: %v", err) + } else if !success && err == nil { + t.Errorf("should not have logged in correctly") + } +} From 46748abb014fceee8d31fbbde2c53d6489027d23 Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Fri, 15 Mar 2019 19:45:07 -0700 Subject: [PATCH 3/5] use an interface for scram hash instead of enum --- sasl/scram/scram.go | 73 ++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/sasl/scram/scram.go b/sasl/scram/scram.go index 368de0b14..237d23208 100644 --- a/sasl/scram/scram.go +++ b/sasl/scram/scram.go @@ -2,71 +2,68 @@ package scram import ( "context" + "crypto/sha256" "crypto/sha512" "hash" - "github.com/pkg/errors" "github.com/xdg/scram" ) -// HashFunction determines the hash function used by SCRAM to protect the user's +// Algorithm determines the hash function used by SCRAM to protect the user's // credentials. -type HashFunction int +type Algorithm interface { + // Name returns the algorithm's name, e.g. "SCRAM-SHA-256" + Name() string -const ( - _ HashFunction = iota - SHA256 - SHA512 -) + // Hash returns a new hash.Hash. + Hash() hash.Hash +} -func (a HashFunction) name() string { - switch a { - case SHA256: - return "SCRAM-SHA-256" - case SHA512: - return "SCRAM-SHA-512" - } - return "invalid" +type sha256Algo struct{} + +func (sha256Algo) Name() string { + return "SCRAM-SHA-256" } -func (a HashFunction) hashGenerator() scram.HashGeneratorFcn { - switch a { - case SHA256: - return scram.SHA256 - case SHA512: - // for whatever reason, the scram package doesn't have a predefined - // constant for 512, but we can roll our own. - return scram.HashGeneratorFcn(func() hash.Hash { - return sha512.New() - }) - } - return nil +func (sha256Algo) Hash() hash.Hash { + return sha256.New() +} + +type sha512Algo struct{} + +func (sha512Algo) Name() string { + return "SCRAM-SHA-512" +} + +func (sha512Algo) Hash() hash.Hash { + return sha512.New() } +var ( + SHA256 Algorithm = sha256Algo{} + SHA512 Algorithm = sha512Algo{} +) + type mechanism struct { - hash HashFunction + algo Algorithm client *scram.Client convo *scram.ClientConversation } // Mechanism returns a new sasl.Mechanism that will use SCRAM with the provided -// hash function to securely transmit the provided credentials to Kafka. +// Algorithm to securely transmit the provided credentials to Kafka. // // SCRAM-SHA-256 and SCRAM-SHA-512 were added to Kafka in 0.10.2.0. These // mechanisms will not work with older versions. -func Mechanism(hash HashFunction, username, password string) (*mechanism, error) { - hashGen := hash.hashGenerator() - if hashGen == nil { - return nil, errors.New("invalid hash function") - } - +func Mechanism(algo Algorithm, username, password string) (*mechanism, error) { + hashGen := scram.HashGeneratorFcn(algo.Hash) client, err := hashGen.NewClient(username, password, "") if err != nil { return nil, err } return &mechanism{ - hash: hash, + algo: algo, client: client, }, nil } @@ -74,7 +71,7 @@ func Mechanism(hash HashFunction, username, password string) (*mechanism, error) func (m *mechanism) Start(ctx context.Context) (string, []byte, error) { m.convo = m.client.NewConversation() str, err := m.convo.Step("") - return m.hash.name(), []byte(str), err + return m.algo.Name(), []byte(str), err } func (m *mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { From 11d287dee59648419fb9e5aaead2cf3da266ba96 Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Mon, 18 Mar 2019 12:04:40 -0700 Subject: [PATCH 4/5] don't return unexported type --- sasl/scram/scram.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sasl/scram/scram.go b/sasl/scram/scram.go index 237d23208..5e96cbf0c 100644 --- a/sasl/scram/scram.go +++ b/sasl/scram/scram.go @@ -6,6 +6,7 @@ import ( "crypto/sha512" "hash" + "github.com/segmentio/kafka-go" "github.com/xdg/scram" ) @@ -55,7 +56,7 @@ type mechanism struct { // // SCRAM-SHA-256 and SCRAM-SHA-512 were added to Kafka in 0.10.2.0. These // mechanisms will not work with older versions. -func Mechanism(algo Algorithm, username, password string) (*mechanism, error) { +func Mechanism(algo Algorithm, username, password string) (kafka.SASLMechanism, error) { hashGen := scram.HashGeneratorFcn(algo.Hash) client, err := hashGen.NewClient(username, password, "") if err != nil { From dd1cda16988e82c180d6216e7e6edc7d97dc35fc Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Mon, 18 Mar 2019 13:13:32 -0700 Subject: [PATCH 5/5] refactoring... * moved Mechanism -> sasl.Mechanism * moved common test code -> testing package * return sasl.Mechanism in scram constructor instead of *mechanism --- conn_test.go | 3 ++- dialer.go | 4 +++- sasl.go => sasl/sasl.go | 22 +++++++++++----------- sasl_test.go => sasl/sasl_test.go | 27 +++++++++++++++------------ sasl/scram/scram.go | 4 ++-- version_test.go => testing/version.go | 0 6 files changed, 33 insertions(+), 27 deletions(-) rename sasl.go => sasl/sasl.go (54%) rename sasl_test.go => sasl/sasl_test.go (77%) rename version_test.go => testing/version.go (100%) diff --git a/conn_test.go b/conn_test.go index f7755ff18..683888d90 100644 --- a/conn_test.go +++ b/conn_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + ktesting "github.com/segmentio/kafka-go/testing" "golang.org/x/net/nettest" ) @@ -257,7 +258,7 @@ func TestConn(t *testing.T) { ) for _, test := range tests { - if !KafkaIsAtLeast(test.minVersion) { + if !ktesting.KafkaIsAtLeast(test.minVersion) { t.Log("skipping " + test.scenario + " because broker is not at least version " + test.minVersion) continue } diff --git a/dialer.go b/dialer.go index 31f1ae643..6eb0c519c 100644 --- a/dialer.go +++ b/dialer.go @@ -8,6 +8,8 @@ import ( "strconv" "strings" "time" + + "github.com/segmentio/kafka-go/sasl" ) // The Dialer type mirrors the net.Dialer API but is designed to open kafka @@ -65,7 +67,7 @@ type Dialer struct { // SASLMechanism configures the Dialer to use SASL authentication. If nil, // no authentication will be performed. - SASLMechanism SASLMechanism + SASLMechanism sasl.Mechanism } // Dial connects to the address on the named network. diff --git a/sasl.go b/sasl/sasl.go similarity index 54% rename from sasl.go rename to sasl/sasl.go index cdbce0549..ae7121c30 100644 --- a/sasl.go +++ b/sasl/sasl.go @@ -1,17 +1,17 @@ -package kafka +package sasl import "context" -// SASLMechanism implements the SASL state machine. It is initialized by -// calling Start at which point the initial bytes should be sent to the server. -// The caller then loops by passing the server's response into Next and then -// sending Next's returned bytes to the server. Eventually either Next will -// indicate that the authentication has been successfully completed or an error -// will cause the state machine to exit prematurely. +// Mechanism implements the SASL state machine. It is initialized by calling +// Start at which point the initial bytes should be sent to the server. The +// caller then loops by passing the server's response into Next and then sending +// Next's returned bytes to the server. Eventually either Next will indicate +// that the authentication has been successfully completed or an error will +// cause the state machine to exit prematurely. // -// A SASLMechanism must be re-usable, but it does not need to be safe for -// concurrent access by multiple go routines. -type SASLMechanism interface { +// A Mechanism must be re-usable, but it does not need to be safe for concurrent +// access by multiple go routines. +type Mechanism interface { // Start begins SASL authentication. It returns the authentication mechanism // name and "initial response" data (if required by the selected mechanism). // A non-nil error causes the client to abort the authentication attempt. @@ -21,7 +21,7 @@ type SASLMechanism interface { // while a zero-length value indicates an empty initial response, which must // be sent to the server. // - // In order to ensure that the SASLMechanism is reusable, calling Start must + // In order to ensure that the Mechanism is reusable, calling Start must // reset any internal state. Start(ctx context.Context) (mech string, ir []byte, err error) diff --git a/sasl_test.go b/sasl/sasl_test.go similarity index 77% rename from sasl_test.go rename to sasl/sasl_test.go index a8ffc4e67..4ed5c214a 100644 --- a/sasl_test.go +++ b/sasl/sasl_test.go @@ -1,12 +1,15 @@ -package kafka +package sasl_test import ( "context" "testing" "time" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" + ktesting "github.com/segmentio/kafka-go/testing" ) const ( @@ -19,18 +22,18 @@ func TestSASL(t *testing.T) { t.Parallel() tests := []struct { - valid func() SASLMechanism - invalid func() SASLMechanism + valid func() sasl.Mechanism + invalid func() sasl.Mechanism minKafka string }{ { - valid: func() SASLMechanism { + valid: func() sasl.Mechanism { return plain.Mechanism{ Username: "adminplain", Password: "admin-secret", } }, - invalid: func() SASLMechanism { + invalid: func() sasl.Mechanism { return plain.Mechanism{ Username: "adminplain", Password: "badpassword", @@ -38,22 +41,22 @@ func TestSASL(t *testing.T) { }, }, { - valid: func() SASLMechanism { + valid: func() sasl.Mechanism { mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256") return mech }, - invalid: func() SASLMechanism { + invalid: func() sasl.Mechanism { mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword") return mech }, minKafka: "0.10.2.0", }, { - valid: func() SASLMechanism { + valid: func() sasl.Mechanism { mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512") return mech }, - invalid: func() SASLMechanism { + invalid: func() sasl.Mechanism { mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword") return mech }, @@ -63,7 +66,7 @@ func TestSASL(t *testing.T) { for _, tt := range tests { name, _, _ := tt.valid().Start(context.Background()) - if !KafkaIsAtLeast(tt.minKafka) { + if !ktesting.KafkaIsAtLeast(tt.minKafka) { t.Skip("requires min kafka version " + tt.minKafka) } @@ -83,11 +86,11 @@ func TestSASL(t *testing.T) { } } -func testConnect(t *testing.T, mechanism SASLMechanism, success bool) { +func testConnect(t *testing.T, mechanism sasl.Mechanism, success bool) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - d := Dialer{ + d := kafka.Dialer{ SASLMechanism: mechanism, } _, err := d.DialLeader(ctx, "tcp", saslTestConnect, saslTestTopic, 0) diff --git a/sasl/scram/scram.go b/sasl/scram/scram.go index 5e96cbf0c..495630383 100644 --- a/sasl/scram/scram.go +++ b/sasl/scram/scram.go @@ -6,7 +6,7 @@ import ( "crypto/sha512" "hash" - "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" "github.com/xdg/scram" ) @@ -56,7 +56,7 @@ type mechanism struct { // // SCRAM-SHA-256 and SCRAM-SHA-512 were added to Kafka in 0.10.2.0. These // mechanisms will not work with older versions. -func Mechanism(algo Algorithm, username, password string) (kafka.SASLMechanism, error) { +func Mechanism(algo Algorithm, username, password string) (sasl.Mechanism, error) { hashGen := scram.HashGeneratorFcn(algo.Hash) client, err := hashGen.NewClient(username, password, "") if err != nil { diff --git a/version_test.go b/testing/version.go similarity index 100% rename from version_test.go rename to testing/version.go