Skip to content

Commit

Permalink
MGMT-14979: add sasl/scraml auth method for kafka notifications
Browse files Browse the repository at this point in the history
Signed-off-by: Riccardo piccoli <rpiccoli@redhat.com>
  • Loading branch information
rccrdpccl committed Jun 20, 2023
1 parent db09755 commit bcae8cc
Show file tree
Hide file tree
Showing 29 changed files with 4,886 additions and 9 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/russross/blackfriday v1.6.0 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
k8s.io/cli-runtime v0.25.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions openshift/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ parameters:
value: "15s"
- name: SVC_TARGET_PORT
value: "8900"
- name: KAFKA_SASL_MECHANISM
value: "PLAIN"
- name: KAFKA_CREDENTIALS_SECRET_NAME
required: false
value: assisted-installer-event-stream
Expand Down Expand Up @@ -345,6 +347,8 @@ objects:
value: ${ENABLE_EVENT_STREAMING}
- name: KAFKA_EVENT_STREAM_TOPIC
value: ${KAFKA_EVENT_STREAM_TOPIC}
- name: KAFKA_SASL_MECHANISM
value: ${KAFKA_SASL_MECHANISM}
- name: NAMESPACE
valueFrom:
fieldRef:
Expand Down
47 changes: 38 additions & 9 deletions pkg/kafka/json_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

const (
saslPlain string = "PLAIN"
saslScram string = "SCRAM"

WriteTimeout time.Duration = 5 * time.Second
)

Expand All @@ -25,6 +32,7 @@ type Producer interface {
}

type Config struct {
SaslMechanism string `envconfig:"KAFKA_SASL_MECHANISM" default:"PLAIN"`
BootstrapServer string `envconfig:"KAFKA_BOOTSTRAP_SERVER" required:"true"`
ClientID string `envconfig:"KAFKA_CLIENT_ID" default:""`
ClientSecret string `envconfig:"KAFKA_CLIENT_SECRET" default:""`
Expand All @@ -35,28 +43,46 @@ type JSONWriter struct {
producer Producer
}

func newProducer(config *Config) Producer {
func getMechanism(config *Config) (sasl.Mechanism, error) {
if config.ClientID == "" || config.ClientSecret == "" {
// no credentials set, possibly using unauthenticated connection
return nil, nil
}
if config.SaslMechanism == saslPlain {
return &plain.Mechanism{
Username: config.ClientID,
Password: config.ClientSecret,
}, nil
}
if config.SaslMechanism == saslScram {
return scram.Mechanism(scram.SHA512, config.ClientID, config.ClientSecret)

}
return nil, fmt.Errorf("sasl mechanism %s is not valid", config.SaslMechanism)
}

func newProducer(config *Config) (Producer, error) {
brokers := strings.Split(config.BootstrapServer, ",")
writer := &kafka.Writer{
Addr: kafka.TCP(config.BootstrapServer),
Addr: kafka.TCP(brokers...),
Topic: config.Topic,
Balancer: &kafka.ReferenceHash{},
Compression: compress.Gzip,
Async: true,
WriteTimeout: WriteTimeout,
}
if config.ClientID != "" && config.ClientSecret != "" {
mechanism := &plain.Mechanism{
Username: config.ClientID,
Password: config.ClientSecret,
}
mechanism, err := getMechanism(config)
if err != nil {
return writer, err
}
if mechanism != nil {
writer.Transport = &kafka.Transport{
SASL: mechanism,
// let config pick default root CA, but define it to force TLS
TLS: &tls.Config{},
}
}
return writer
return writer, nil
}

func NewWriter() (*JSONWriter, error) {
Expand All @@ -66,7 +92,10 @@ func NewWriter() (*JSONWriter, error) {
return nil, err
}

p := newProducer(config)
p, err := newProducer(config)
if err != nil {
return nil, err
}
return &JSONWriter{
producer: p,
}, nil
Expand Down
91 changes: 91 additions & 0 deletions vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
11 changes: 11 additions & 0 deletions vendor/github.com/xdg/scram/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bcae8cc

Please sign in to comment.