Skip to content

Commit

Permalink
sink/kafka: add sasl scram support for kafka sink (#1789) (#1941)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Jul 7, 2021
1 parent 9111d8f commit 0a9e759
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 11 deletions.
15 changes: 15 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,21 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
config.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
config.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
config.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
config.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
Expand Down
18 changes: 16 additions & 2 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type Config struct {
Compression string
ClientID string
Credential *security.Credential
// TODO support SASL authentication

SaslScram *security.SaslScram
// control whether to create topic and verify partition number
TopicPreProcess bool
}
Expand All @@ -58,6 +57,7 @@ func NewKafkaConfig() Config {
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
TopicPreProcess: true,
}
}
Expand Down Expand Up @@ -464,5 +464,19 @@ func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
}
}

if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 {
config.Net.SASL.Enable = true
config.Net.SASL.User = c.SaslScram.SaslUser
config.Net.SASL.Password = c.SaslScram.SaslPassword
config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism)
if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} }
} else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} }
} else {
return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512")
}
}

return config, err
}
16 changes: 16 additions & 0 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,22 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
}
_, err = newSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory")

saslConfig := NewKafkaConfig()
saslConfig.Version = "2.6.0"
saslConfig.ClientID = "test-sasl-scram"
saslConfig.SaslScram = &security.SaslScram{
SaslUser: "user",
SaslPassword: "password",
SaslMechanism: sarama.SASLTypeSCRAMSHA256,
}

cfg, err := newSaramaConfigImpl(ctx, saslConfig)
c.Assert(err, check.IsNil)
c.Assert(cfg, check.NotNil)
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
c.Assert(cfg.Net.SASL.Password, check.Equals, "password")
c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256"))
}

func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ require (
github.com/tinylib/msgp v1.1.2
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/mod v0.4.1 // indirect
Expand Down
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.0.0 h1:XJIw/+VlJ+87J+doOxznsAWIdmWuViOVhkQamW5YV28=
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
Expand Down Expand Up @@ -211,7 +209,6 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-graphviz v0.0.5/go.mod h1:wXVsXxmyMQU6TN3zGRttjNn3h+iCAS7xQFC6TlNvLhk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down Expand Up @@ -621,7 +618,9 @@ github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1 h1:d71/KA0LhvkrJ/Ok+Wx9qK7bU8meK
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oTuqa03RjMwMfk/7/TCs+QI=
github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY=
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand All @@ -634,13 +633,12 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547 h1:s71VGheLtWmCYsnNjf+s7XE8HsrZnd3EYGrLGWVm7nY=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547/go.mod h1:YoUyTScD3Vcv2RBm3eGVOq7i1ULiz3OuXoQFWOirmAM=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b h1:3kC4J3eQF6p1UEfQTkC67eEeb3rTk+shQqdX6tFyq9Q=
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
Expand Down Expand Up @@ -668,7 +666,6 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
Expand All @@ -679,7 +676,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
26 changes: 26 additions & 0 deletions pkg/security/sasl_scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package security

// SaslScram holds necessary path parameter to support sasl-scram
type SaslScram struct {
SaslUser string `toml:"sasl-user" json:"sasl-user"`
SaslPassword string `toml:"sasl-password" json:"sasl-password"`
SaslMechanism string `toml:"sasl-mechanism" json:"sasl-mechanism"`
}

// IsSaslScramEnabled checks whether SASL SCRAM is enabled or not.
func (s *SaslScram) IsSaslScramEnabled() bool {
return len(s.SaslUser) != 0
}
57 changes: 57 additions & 0 deletions pkg/security/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package security

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var (
// SHA256 func
SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
// SHA512 func
SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
)

// XDGSCRAMClient xdg scram client
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

// Begin xdg scram client Begin
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

// Step xdg scram client Step
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

// Done xdg scram client Done
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

0 comments on commit 0a9e759

Please sign in to comment.