Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SASL/OAUTHBEARER #230

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ kafka:
username: ""
# Password to use for PLAIN or SCRAM mechanism
password: ""
# Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
# Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
mechanism: "PLAIN"
# GSSAPI / Kerberos config properties
gssapi:
Expand All @@ -58,6 +58,12 @@ kafka:
password: ""
realm: ""
enableFast: true
# OAUTHBEARER config properties
oauth:
tokenEndpoint: ""
clientId: ""
clientSecret: ""
scope: ""

minion:
consumerGroups:
Expand Down
14 changes: 14 additions & 0 deletions kafka/client_config_helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/twmb/franz-go/pkg/kversion"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/kerberos"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"go.uber.org/zap"
Expand Down Expand Up @@ -108,6 +110,18 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
}.AsMechanism()
opts = append(opts, kgo.SASL(kerberosMechanism))
}

// OAuthBearer
if cfg.SASL.Mechanism == "OAUTHBEARER" {
mechanism := oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) {
token, err := cfg.SASL.OAuthBearer.getToken(ctx)
return oauth.Auth{
Zid: cfg.SASL.OAuthBearer.ClientID,
Token: token,
}, err
})
opts = append(opts, kgo.SASL(mechanism))
}
}

// Configure TLS
Expand Down
5 changes: 3 additions & 2 deletions kafka/config_sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type SASLConfig struct {
Mechanism string `koanf:"mechanism"`

// SASL Mechanisms that require more configuration than username & password
GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
OAuthBearer OAuthBearerConfig `koanf:"oauth"`
}

// SetDefaults for SASL Config
Expand All @@ -38,7 +39,7 @@ func (c *SASLConfig) Validate() error {
case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI:
// Valid and supported
case SASLMechanismOAuthBearer:
return fmt.Errorf("sasl mechanism '%v' is valid but not yet supported. Please submit an issue if you need it", c.Mechanism)
return c.OAuthBearer.Validate()
default:
return fmt.Errorf("given sasl mechanism '%v' is invalid", c.Mechanism)
}
Expand Down
73 changes: 73 additions & 0 deletions kafka/config_sasl_oauthbearer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
)

type OAuthBearerConfig struct {
TokenEndpoint string `koanf:"tokenEndpoint"`
ClientID string `koanf:"clientId"`
ClientSecret string `koanf:"clientSecret"`
Scope string `koanf:"scope"`
}

func (c *OAuthBearerConfig) Validate() error {
if c.TokenEndpoint == "" {
return fmt.Errorf("OAuthBearer token endpoint is not specified")
}
if c.ClientID == "" || c.ClientSecret == "" {
return fmt.Errorf("OAuthBearer client credentials are not specified")
}
return nil
}

// same as AcquireToken in Console https://github.com/redpanda-data/console/blob/master/backend/pkg/config/kafka_sasl_oauth.go#L56
func (c *OAuthBearerConfig) getToken(ctx context.Context) (string, error) {
authHeaderValue := base64.StdEncoding.EncodeToString([]byte(c.ClientID + ":" + c.ClientSecret))

queryParams := url.Values{
"grant_type": []string{"client_credentials"},
"scope": []string{c.Scope},
}

req, err := http.NewRequestWithContext(ctx, "POST", c.TokenEndpoint, strings.NewReader(queryParams.Encode()))
if err != nil {
return "", fmt.Errorf("failed to create HTTP request: %w", err)
}

req.URL.RawQuery = queryParams.Encode()

req.Header.Set("Authorization", "Basic "+authHeaderValue)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

client := &http.Client{}

resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode)
}

var tokenResponse map[string]interface{}
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&tokenResponse); err != nil {
return "", fmt.Errorf("failed to parse token response: %w", err)
}

accessToken, ok := tokenResponse["access_token"].(string)
if !ok {
return "", fmt.Errorf("access_token not found in token response")
}

return accessToken, nil
}