Skip to content

Commit

Permalink
Merge branch 'master' into fix-sink-goroutine-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Jun 4, 2021
2 parents 32673df + 8983126 commit afd9d17
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 2 deletions.
15 changes: 15 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,21 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
config.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
config.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
config.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
config.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
Expand Down
18 changes: 16 additions & 2 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type Config struct {
Compression string
ClientID string
Credential *security.Credential
// TODO support SASL authentication

SaslScram *security.SaslScram
// control whether to create topic and verify partition number
TopicPreProcess bool
}
Expand All @@ -58,6 +57,7 @@ func NewKafkaConfig() Config {
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
TopicPreProcess: true,
}
}
Expand Down Expand Up @@ -464,5 +464,19 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
}
}

if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 {
config.Net.SASL.Enable = true
config.Net.SASL.User = c.SaslScram.SaslUser
config.Net.SASL.Password = c.SaslScram.SaslPassword
config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism)
if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} }
} else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} }
} else {
return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512")
}
}

return config, err
}
16 changes: 16 additions & 0 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,22 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
}
_, err = newSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory")

saslConfig := NewKafkaConfig()
saslConfig.Version = "2.6.0"
saslConfig.ClientID = "test-sasl-scram"
saslConfig.SaslScram = &security.SaslScram{
SaslUser: "user",
SaslPassword: "password",
SaslMechanism: sarama.SASLTypeSCRAMSHA256,
}

cfg, err := newSaramaConfigImpl(ctx, saslConfig)
c.Assert(err, check.IsNil)
c.Assert(cfg, check.NotNil)
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
c.Assert(cfg.Net.SASL.Password, check.Equals, "password")
c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256"))
}

func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
github.com/tinylib/msgp v1.1.0
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/bbolt v1.3.4 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b
go.uber.org/zap v1.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,9 @@ github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1 h1:d71/KA0LhvkrJ/Ok+Wx9qK7bU8meK
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oTuqa03RjMwMfk/7/TCs+QI=
github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY=
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down
26 changes: 26 additions & 0 deletions pkg/security/sasl_scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package security

// SaslScram holds necessary path parameter to support sasl-scram
type SaslScram struct {
SaslUser string `toml:"sasl-user" json:"sasl-user"`
SaslPassword string `toml:"sasl-password" json:"sasl-password"`
SaslMechanism string `toml:"sasl-mechanism" json:"sasl-mechanism"`
}

// IsSaslScramEnabled checks whether SASL SCRAM is enabled or not.
func (s *SaslScram) IsSaslScramEnabled() bool {
return len(s.SaslUser) != 0
}
57 changes: 57 additions & 0 deletions pkg/security/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package security

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var (
// SHA256 func
SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
// SHA512 func
SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
)

// XDGSCRAMClient xdg scram client
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

// Begin xdg scram client Begin
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

// Step xdg scram client Step
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

// Done xdg scram client Done
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

0 comments on commit afd9d17

Please sign in to comment.