diff --git a/.circleci/config.yml b/.circleci/config.yml index 93b895eaa..e7505597e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,7 +8,7 @@ jobs: - image: circleci/golang - image: wurstmeister/zookeeper ports: ['2181:2181'] - - image: wurstmeister/kafka:0.10.1.0 + - image: wurstmeister/kafka:0.10.1.1 ports: ['9092:9092'] environment: KAFKA_BROKER_ID: '1' @@ -18,6 +18,12 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } @@ -32,8 +38,8 @@ jobs: - image: circleci/golang - image: wurstmeister/zookeeper ports: ['2181:2181'] - - image: wurstmeister/kafka:0.11.0.1 - ports: ['9092:9092'] + - image: wurstmeister/kafka:2.11-0.11.0.3 + ports: ['9092:9092','9093:9093'] environment: KAFKA_BROKER_ID: '1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' @@ -42,6 +48,13 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } @@ -57,7 +70,7 @@ jobs: - image: wurstmeister/zookeeper ports: ['2181:2181'] - image: wurstmeister/kafka:2.11-1.1.1 - ports: ['9092:9092'] + ports: ['9092:9092','9093:9093'] environment: KAFKA_BROKER_ID: '1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' @@ -66,6 +79,13 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } @@ -81,7 +101,7 @@ jobs: - image: wurstmeister/zookeeper ports: ['2181:2181'] - image: wurstmeister/kafka:2.12-2.1.0 - ports: ['9092:9092'] + ports: ['9092:9092','9093:9093'] environment: KAFKA_BROKER_ID: '1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' @@ -90,6 +110,13 @@ jobs: KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram steps: - checkout - setup_remote_docker: { reusable: true, docker_layer_caching: true } diff --git a/conn.go b/conn.go index bcfb14c3e..00ad8689e 100644 --- a/conn.go +++ b/conn.go @@ -1242,3 +1242,88 @@ func (d *connDeadline) unsetConnWriteDeadline() { d.wconn = nil d.mutex.Unlock() } + +// saslHandshake sends the SASL handshake message. This will determine whether +// the Mechanism is supported by the cluster. If it's not, this function will +// error out with UnsupportedSASLMechanism. +// +// If the mechanism is unsupported, the handshake request will reply with the +// list of the cluster's configured mechanisms, which could potentially be used +// to facilitate negotiation. At the moment, we are not negotiating the +// mechanism as we believe that brokers are usually known to the client, and +// therefore the client should already know which mechanisms are supported. +// +// See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake +func (c *Conn) saslHandshake(mechanism string) error { + // The wire format for V0 and V1 is identical, but the version + // number will affect how the SASL authentication + // challenge/responses are sent + var resp saslHandshakeResponseV0 + version := v0 + if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 { + version = v1 + } + + err := c.writeOperation( + func(deadline time.Time, id int32) error { + return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) + }, + func(deadline time.Time, size int) error { + return expectZeroSize(func() (int, error) { + return (&resp).readFrom(&c.rbuf, size) + }()) + }, + ) + if err == nil && resp.ErrorCode != 0 { + err = Error(resp.ErrorCode) + } + return err +} + +// saslAuthenticate sends the SASL authenticate message. This function must +// be immediately preceded by a successful saslHandshake. +// +// See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate +func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { + // if we sent a v1 handshake, then we must encapsulate the authentication + // request in a saslAuthenticateRequest. otherwise, we read and write raw + // bytes. + if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 { + var request = saslAuthenticateRequestV0{Data: data} + var response saslAuthenticateResponseV0 + + err := c.writeOperation( + func(deadline time.Time, id int32) error { + return c.writeRequest(saslAuthenticateRequest, v0, id, request) + }, + func(deadline time.Time, size int) error { + return expectZeroSize(func() (remain int, err error) { + return (&response).readFrom(&c.rbuf, size) + }()) + }, + ) + if err == nil && response.ErrorCode != 0 { + err = Error(response.ErrorCode) + } + return response.Data, err + } + + // fall back to opaque bytes on the wire. the broker is expecting these if + // it just processed a v0 sasl handshake. + writeInt32(&c.wbuf, int32(len(data))) + if _, err := c.wbuf.Write(data); err != nil { + return nil, err + } + if err := c.wbuf.Flush(); err != nil { + return nil, err + } + + var respLen int32 + _, err := readInt32(&c.rbuf, 4, &respLen) + if err != nil { + return nil, err + } + + resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen)) + return resp, err +} diff --git a/conn_test.go b/conn_test.go index 21df7b8bc..683888d90 100644 --- a/conn_test.go +++ b/conn_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + ktesting "github.com/segmentio/kafka-go/testing" "golang.org/x/net/nettest" ) @@ -257,7 +258,7 @@ func TestConn(t *testing.T) { ) for _, test := range tests { - if !KafkaIsAtLeast(test.minVersion) { + if !ktesting.KafkaIsAtLeast(test.minVersion) { t.Log("skipping " + test.scenario + " because broker is not at least version " + test.minVersion) continue } @@ -980,6 +981,23 @@ func testBrokers(t *testing.T, conn *Conn) { } } +func TestUnsupportedSASLMechanism(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, err := (&Dialer{ + Resolver: &net.Resolver{}, + }).DialContext(ctx, "tcp", "127.0.0.1:9093") + if err != nil { + t.Fatal("failed to open a new kafka connection:", err) + } + defer conn.Close() + + if err := conn.saslHandshake("FOO"); err != UnsupportedSASLMechanism { + t.Errorf("Expected UnsupportedSASLMechanism but got %v", err) + } +} + const benchmarkMessageCount = 100 func BenchmarkConn(b *testing.B) { diff --git a/dialer.go b/dialer.go index ba6e7db2e..6eb0c519c 100644 --- a/dialer.go +++ b/dialer.go @@ -3,10 +3,13 @@ package kafka import ( "context" "crypto/tls" + "io" "net" "strconv" "strings" "time" + + "github.com/segmentio/kafka-go/sasl" ) // The Dialer type mirrors the net.Dialer API but is designed to open kafka @@ -61,6 +64,10 @@ type Dialer struct { // TLS enables Dialer to open secure connections. If nil, standard net.Conn // will be used. TLS *tls.Config + + // SASLMechanism configures the Dialer to use SASL authentication. If nil, + // no authentication will be performed. + SASLMechanism sasl.Mechanism } // Dial connects to the address on the named network. @@ -94,11 +101,7 @@ func (d *Dialer) DialContext(ctx context.Context, network string, address string defer cancel() } - c, err := d.dialContext(ctx, network, address) - if err != nil { - return nil, err - } - return NewConnWith(c, ConnConfig{ClientID: d.ClientID}), nil + return d.connect(ctx, network, address, ConnConfig{ClientID: d.ClientID}) } // DialLeader opens a connection to the leader of the partition for a given @@ -121,16 +124,11 @@ func (d *Dialer) DialLeader(ctx context.Context, network string, address string, // descriptor. It's strongly advised to use descriptor of the partition that comes out of // functions LookupPartition or LookupPartitions. func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) { - c, err := d.dialContext(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port))) - if err != nil { - return nil, err - } - - return NewConnWith(c, ConnConfig{ + return d.connect(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)), ConnConfig{ ClientID: d.ClientID, Topic: partition.Topic, Partition: partition.ID, - }), nil + }) } // LookupLeader searches for the kafka broker that is the leader of the @@ -242,6 +240,66 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf return } +// connect opens a socket connection to the broker, wraps it to create a +// kafka connection, and performs SASL authentication if configured to do so. +func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) { + + c, err := d.dialContext(ctx, network, address) + if err != nil { + return nil, err + } + + conn := NewConnWith(c, connCfg) + + if d.SASLMechanism != nil { + if err := d.authenticateSASL(ctx, conn); err != nil { + _ = conn.Close() + return nil, err + } + } + + return conn, nil +} + +// authenticateSASL performs all of the required requests to authenticate this +// connection. If any step fails, this function returns with an error. A nil +// error indicates successful authentication. +// +// In case of error, this function *does not* close the connection. That is the +// responsibility of the caller. +func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error { + mech, state, err := d.SASLMechanism.Start(ctx) + if err != nil { + return err + } + err = conn.saslHandshake(mech) + if err != nil { + return err + } + + var completed bool + for !completed { + challenge, err := conn.saslAuthenticate(state) + switch err { + case nil: + case io.EOF: + // the broker may communicate a failed exchange by closing the + // connection (esp. in the case where we're passing opaque sasl + // data over the wire since there's no protocol info). + return SASLAuthenticationFailed + default: + return err + } + + completed, state, err = d.SASLMechanism.Next(ctx, challenge) + if err != nil { + return err + } + } + + return nil +} + func (d *Dialer) dialContext(ctx context.Context, network string, address string) (net.Conn, error) { if r := d.Resolver; r != nil { host, port := splitHostPort(address) diff --git a/docker-compose.yml b/docker-compose.yml index 6c59b8760..cc393f433 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,12 +1,13 @@ version: "3" services: kafka: - image: wurstmeister/kafka:0.11.0.1 + image: wurstmeister/kafka:2.11-0.11.0.3 restart: on-failure:3 links: - zookeeper ports: - "9092:9092" + - "9093:9093" environment: KAFKA_VERSION: '0.11.0.1' KAFKA_BROKER_ID: 1 @@ -17,7 +18,13 @@ services: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_MESSAGE_MAX_BYTES: 200000000 - + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram zookeeper: image: wurstmeister/zookeeper ports: diff --git a/protocol.go b/protocol.go index b588aca72..039b1cc71 100644 --- a/protocol.go +++ b/protocol.go @@ -22,9 +22,11 @@ const ( syncGroupRequest apiKey = 14 describeGroupsRequest apiKey = 15 listGroupsRequest apiKey = 16 + saslHandshakeRequest apiKey = 17 apiVersionsRequest apiKey = 18 createTopicsRequest apiKey = 19 deleteTopicsRequest apiKey = 20 + saslAuthenticateRequest apiKey = 36 ) type apiVersion int16 diff --git a/sasl/plain/plain.go b/sasl/plain/plain.go new file mode 100644 index 000000000..15341d081 --- /dev/null +++ b/sasl/plain/plain.go @@ -0,0 +1,23 @@ +package plain + +import ( + "context" + "fmt" +) + +// Mechanism implements the PLAIN mechanism and passes the credentials in clear +// text. +type Mechanism struct { + Username string + Password string +} + +func (m Mechanism) Start(ctx context.Context) (string, []byte, error) { + return "PLAIN", []byte(fmt.Sprintf("\x00%s\x00%s", m.Username, m.Password)), nil +} + +func (m Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { + // kafka will return error if it rejected the credentials, so we'd only + // arrive here on success. + return true, nil, nil +} diff --git a/sasl/sasl.go b/sasl/sasl.go new file mode 100644 index 000000000..ae7121c30 --- /dev/null +++ b/sasl/sasl.go @@ -0,0 +1,31 @@ +package sasl + +import "context" + +// Mechanism implements the SASL state machine. It is initialized by calling +// Start at which point the initial bytes should be sent to the server. The +// caller then loops by passing the server's response into Next and then sending +// Next's returned bytes to the server. Eventually either Next will indicate +// that the authentication has been successfully completed or an error will +// cause the state machine to exit prematurely. +// +// A Mechanism must be re-usable, but it does not need to be safe for concurrent +// access by multiple go routines. +type Mechanism interface { + // Start begins SASL authentication. It returns the authentication mechanism + // name and "initial response" data (if required by the selected mechanism). + // A non-nil error causes the client to abort the authentication attempt. + // + // A nil ir value is different from a zero-length value. The nil value + // indicates that the selected mechanism does not use an initial response, + // while a zero-length value indicates an empty initial response, which must + // be sent to the server. + // + // In order to ensure that the Mechanism is reusable, calling Start must + // reset any internal state. + Start(ctx context.Context) (mech string, ir []byte, err error) + + // Next continues challenge-response authentication. A non-nil error causes + // the client to abort the authentication attempt. + Next(ctx context.Context, challenge []byte) (done bool, response []byte, err error) +} diff --git a/sasl/sasl_test.go b/sasl/sasl_test.go new file mode 100644 index 000000000..4ed5c214a --- /dev/null +++ b/sasl/sasl_test.go @@ -0,0 +1,102 @@ +package sasl_test + +import ( + "context" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" + ktesting "github.com/segmentio/kafka-go/testing" +) + +const ( + saslTestConnect = "localhost:9093" // connect to sasl listener + saslTestTopic = "test-writer-0" // this topic is guaranteed to exist. +) + +func TestSASL(t *testing.T) { + + t.Parallel() + + tests := []struct { + valid func() sasl.Mechanism + invalid func() sasl.Mechanism + minKafka string + }{ + { + valid: func() sasl.Mechanism { + return plain.Mechanism{ + Username: "adminplain", + Password: "admin-secret", + } + }, + invalid: func() sasl.Mechanism { + return plain.Mechanism{ + Username: "adminplain", + Password: "badpassword", + } + }, + }, + { + valid: func() sasl.Mechanism { + mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256") + return mech + }, + invalid: func() sasl.Mechanism { + mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword") + return mech + }, + minKafka: "0.10.2.0", + }, + { + valid: func() sasl.Mechanism { + mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512") + return mech + }, + invalid: func() sasl.Mechanism { + mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword") + return mech + }, + minKafka: "0.10.2.0", + }, + } + + for _, tt := range tests { + name, _, _ := tt.valid().Start(context.Background()) + if !ktesting.KafkaIsAtLeast(tt.minKafka) { + t.Skip("requires min kafka version " + tt.minKafka) + } + + t.Run(name+" success", func(t *testing.T) { + testConnect(t, tt.valid(), true) + }) + t.Run(name+" failure", func(t *testing.T) { + testConnect(t, tt.invalid(), false) + }) + t.Run(name+" is reusable", func(t *testing.T) { + mech := tt.valid() + testConnect(t, mech, true) + testConnect(t, mech, true) + testConnect(t, mech, true) + }) + + } +} + +func testConnect(t *testing.T, mechanism sasl.Mechanism, success bool) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + d := kafka.Dialer{ + SASLMechanism: mechanism, + } + _, err := d.DialLeader(ctx, "tcp", saslTestConnect, saslTestTopic, 0) + if success && err != nil { + t.Errorf("should have logged in correctly, got err: %v", err) + } else if !success && err == nil { + t.Errorf("should not have logged in correctly") + } +} diff --git a/sasl/scram/scram.go b/sasl/scram/scram.go new file mode 100644 index 000000000..495630383 --- /dev/null +++ b/sasl/scram/scram.go @@ -0,0 +1,81 @@ +package scram + +import ( + "context" + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/segmentio/kafka-go/sasl" + "github.com/xdg/scram" +) + +// Algorithm determines the hash function used by SCRAM to protect the user's +// credentials. +type Algorithm interface { + // Name returns the algorithm's name, e.g. "SCRAM-SHA-256" + Name() string + + // Hash returns a new hash.Hash. + Hash() hash.Hash +} + +type sha256Algo struct{} + +func (sha256Algo) Name() string { + return "SCRAM-SHA-256" +} + +func (sha256Algo) Hash() hash.Hash { + return sha256.New() +} + +type sha512Algo struct{} + +func (sha512Algo) Name() string { + return "SCRAM-SHA-512" +} + +func (sha512Algo) Hash() hash.Hash { + return sha512.New() +} + +var ( + SHA256 Algorithm = sha256Algo{} + SHA512 Algorithm = sha512Algo{} +) + +type mechanism struct { + algo Algorithm + client *scram.Client + convo *scram.ClientConversation +} + +// Mechanism returns a new sasl.Mechanism that will use SCRAM with the provided +// Algorithm to securely transmit the provided credentials to Kafka. +// +// SCRAM-SHA-256 and SCRAM-SHA-512 were added to Kafka in 0.10.2.0. These +// mechanisms will not work with older versions. +func Mechanism(algo Algorithm, username, password string) (sasl.Mechanism, error) { + hashGen := scram.HashGeneratorFcn(algo.Hash) + client, err := hashGen.NewClient(username, password, "") + if err != nil { + return nil, err + } + + return &mechanism{ + algo: algo, + client: client, + }, nil +} + +func (m *mechanism) Start(ctx context.Context) (string, []byte, error) { + m.convo = m.client.NewConversation() + str, err := m.convo.Step("") + return m.algo.Name(), []byte(str), err +} + +func (m *mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { + str, err := m.convo.Step(string(challenge)) + return m.convo.Done(), []byte(str), err +} diff --git a/saslauthenticate.go b/saslauthenticate.go new file mode 100644 index 000000000..487c96edc --- /dev/null +++ b/saslauthenticate.go @@ -0,0 +1,54 @@ +package kafka + +import ( + "bufio" +) + +type saslAuthenticateRequestV0 struct { + // Data holds the SASL payload + Data []byte +} + +func (t saslAuthenticateRequestV0) size() int32 { + return sizeofBytes(t.Data) +} + +func (t *saslAuthenticateRequestV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + return readBytes(r, sz, &t.Data) +} + +func (t saslAuthenticateRequestV0) writeTo(w *bufio.Writer) { + writeBytes(w, t.Data) +} + +type saslAuthenticateResponseV0 struct { + // ErrorCode holds response error code + ErrorCode int16 + + ErrorMessage string + + Data []byte +} + +func (t saslAuthenticateResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) + sizeofString(t.ErrorMessage) + sizeofBytes(t.Data) +} + +func (t saslAuthenticateResponseV0) writeTo(w *bufio.Writer) { + writeInt16(w, t.ErrorCode) + writeString(w, t.ErrorMessage) + writeBytes(w, t.Data) +} + +func (t *saslAuthenticateResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { + return + } + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } + if remain, err = readBytes(r, remain, &t.Data); err != nil { + return + } + return +} diff --git a/saslauthenticate_test.go b/saslauthenticate_test.go new file mode 100644 index 000000000..b064fcaa1 --- /dev/null +++ b/saslauthenticate_test.go @@ -0,0 +1,62 @@ +package kafka + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +func TestSASLAuthenticateRequestV0(t *testing.T) { + item := saslAuthenticateRequestV0{ + Data: []byte("\x00user\x00pass"), + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslAuthenticateRequestV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} + +func TestSASLAuthenticateResponseV0(t *testing.T) { + item := saslAuthenticateResponseV0{ + ErrorCode: 2, + ErrorMessage: "Message", + Data: []byte("bytes"), + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslAuthenticateResponseV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} diff --git a/saslhandshake.go b/saslhandshake.go new file mode 100644 index 000000000..aa37e1122 --- /dev/null +++ b/saslhandshake.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "bufio" +) + +// saslHandshakeRequestV0 implements the format for V0 and V1 SASL +// requests (they are identical) +type saslHandshakeRequestV0 struct { + // Mechanism holds the SASL Mechanism chosen by the client. + Mechanism string +} + +func (t saslHandshakeRequestV0) size() int32 { + return sizeofString(t.Mechanism) +} + +func (t *saslHandshakeRequestV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + return readString(r, sz, &t.Mechanism) +} + +func (t saslHandshakeRequestV0) writeTo(w *bufio.Writer) { + writeString(w, t.Mechanism) +} + +// saslHandshakeResponseV0 implements the format for V0 and V1 SASL +// responses (they are identical) +type saslHandshakeResponseV0 struct { + // ErrorCode holds response error code + ErrorCode int16 + + // Array of mechanisms enabled in the server + EnabledMechanisms []string +} + +func (t saslHandshakeResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) + sizeofStringArray(t.EnabledMechanisms) +} + +func (t saslHandshakeResponseV0) writeTo(w *bufio.Writer) { + writeInt16(w, t.ErrorCode) + writeStringArray(w, t.EnabledMechanisms) +} + +func (t *saslHandshakeResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { + return + } + if remain, err = readStringArray(r, remain, &t.EnabledMechanisms); err != nil { + return + } + return +} diff --git a/saslhandshake_test.go b/saslhandshake_test.go new file mode 100644 index 000000000..0b8e150b3 --- /dev/null +++ b/saslhandshake_test.go @@ -0,0 +1,61 @@ +package kafka + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +func TestSASLHandshakeRequestV0(t *testing.T) { + item := saslHandshakeRequestV0{ + Mechanism: "SCRAM-SHA-512", + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslHandshakeRequestV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} + +func TestSASLHandshakeResponseV0(t *testing.T) { + item := saslHandshakeResponseV0{ + ErrorCode: 2, + EnabledMechanisms: []string{"PLAIN", "SCRAM-SHA-512"}, + } + + buf := bytes.NewBuffer(nil) + w := bufio.NewWriter(buf) + item.writeTo(w) + w.Flush() + + var found saslHandshakeResponseV0 + remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } +} diff --git a/version_test.go b/testing/version.go similarity index 100% rename from version_test.go rename to testing/version.go