Skip to content

Commit

Permalink
Feat: change userName to username in Kafka sasl (#591)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethfoo authored Jul 11, 2023
1 parent a18cfbc commit 9fcbacb
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
11 changes: 9 additions & 2 deletions pkg/sink/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,18 @@ type RenderTopicFail struct {

type SASL struct {
Type string `yaml:"type,omitempty"`
UserName string `yaml:"userName,omitempty"`
UserName string `yaml:"userName,omitempty"` // Deprecated, use username instead
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
Algorithm string `yaml:"algorithm,omitempty"`
}

func (c *Config) SetDefaults() {
if c.SASL.UserName != "" {
c.SASL.Username = c.SASL.UserName
}
}

func (c *Config) Validate() error {

if err := pattern.Validate(c.Topic); err != nil {
Expand Down Expand Up @@ -112,7 +119,7 @@ func (c *Config) Validate() error {

func (s *SASL) Validate() error {
if s.Type != SASLNoneType {
if s.UserName == "" {
if s.Username == "" {
return fmt.Errorf("kafka sink or source %s sasl with empty user name", s.Type)
}
if s.Password == "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Sink) Init(context api.Context) error {

func (s *Sink) Start() error {
c := s.config
mechanism, err := Mechanism(c.SASL.Type, c.SASL.UserName, c.SASL.Password, c.SASL.Algorithm)
mechanism, err := Mechanism(c.SASL.Type, c.SASL.Username, c.SASL.Password, c.SASL.Algorithm)
if err != nil {
log.Error("kafka sink sasl mechanism with error: %s", err.Error())
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/source/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func getAutoOffset(autoOffsetReset string) int64 {
return kafka.LastOffset
}

func (c *Config) SetDefaults() {
if c.SASL.UserName != "" {
c.SASL.Username = c.SASL.UserName
}
}

func (c *Config) Validate() error {
if c.Topic == "" && len(c.Topics) == 0 {
return errors.New("topic or topics is required")
Expand Down
2 changes: 1 addition & 1 deletion pkg/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (k *Source) Init(context api.Context) error {

func (k *Source) Start() error {
c := k.config
mechanism, err := kafkaSink.Mechanism(c.SASL.Type, c.SASL.UserName, c.SASL.Password, c.SASL.Algorithm)
mechanism, err := kafkaSink.Mechanism(c.SASL.Type, c.SASL.Username, c.SASL.Password, c.SASL.Algorithm)
if err != nil {
log.Error("kafka source sasl mechanism with error: %s", err.Error())
return err
Expand Down

0 comments on commit 9fcbacb

Please sign in to comment.