Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support OAuth for pulsar scaler #4709

Merged
merged 12 commits into from
Sep 17, 2023
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905))
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))
- **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960))
- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700))

### Fixes
- **RabbitMQ Scaler**: Allow subpaths along with vhost in connection string ([#2634](https://github.com/kedacore/keda/issues/2634))
Expand All @@ -84,9 +85,9 @@ New deprecation(s):
### Other

- **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902))
- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))
- **General**: Replace deprecated `set-output` command with environment file ([#4914](https://github.com/kedacore/keda/issues/4914))
- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049))

## v2.11.2

Expand Down
42 changes: 40 additions & 2 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
if out.EnableBasicAuth {
return nil, errors.New("both bearer and basic authentication can not be set")
}

out.BearerToken = authParams["bearerToken"]
if out.EnableOAuth {
return nil, errors.New("both bearer and OAuth can not be set")
}
out.BearerToken = strings.TrimSuffix(authParams["bearerToken"], "\n")
out.EnableBearerAuth = true
case BasicAuthType:
if len(authParams["username"]) == 0 {
Expand All @@ -51,6 +53,9 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
if out.EnableBearerAuth {
return nil, errors.New("both bearer and basic authentication can not be set")
}
if out.EnableOAuth {
return nil, errors.New("both bearer and OAuth can not be set")
}

out.Username = authParams["username"]
// password is optional. For convenience, many application implement basic auth with
Expand Down Expand Up @@ -80,6 +85,18 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
}
out.CustomAuthValue = strings.TrimSuffix(authParams["customAuthValue"], "\n")
out.EnableCustomAuth = true
case OAuthType:
if out.EnableBasicAuth {
return nil, errors.New("both oauth and basic authentication can not be set")
}
if out.EnableBearerAuth {
return nil, errors.New("both oauth and bearer authentication can not be set")
}
out.EnableOAuth = true
out.OauthTokenURI = authParams["oauthTokenURI"]
out.Scopes = ParseScope(authParams["scope"])
out.ClientID = authParams["clientID"]
out.ClientSecret = authParams["clientSecret"]
default:
return nil, fmt.Errorf("incorrect value for authMode is given: %s", t)
}
Expand All @@ -92,6 +109,27 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
return out, err
}

// ParseScope parse OAuth scopes from a comma separated string
// whitespace is trimmed
func ParseScope(inputStr string) []string {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
scope := strings.TrimSpace(inputStr)
if scope != "" {
scopes := make([]string, 0)
list := strings.Split(scope, ",")
for _, sc := range list {
sc := strings.TrimSpace(sc)
if sc != "" {
scopes = append(scopes, sc)
}
}
if len(scopes) == 0 {
return nil
}
return scopes
}
return nil
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
BearerAuthType Type = "bearer"
// CustomAuthType is an auth type using a custom header
CustomAuthType Type = "custom"
// OAuthType is an auth type using a oAuth2
OAuthType Type = "oauth"
)

// TransportType is type of http transport
Expand All @@ -42,6 +44,13 @@ type AuthMeta struct {
Key string
CA string

// oAuth2
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string

// custom auth header
EnableCustomAuth bool
CustomAuthHeader string
Expand Down
31 changes: 30 additions & 1 deletion pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
"golang.org/x/oauth2/clientcredentials"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand Down Expand Up @@ -211,6 +213,23 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada
if err != nil {
return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err)
}

if auth != nil && auth.EnableOAuth {
if auth.OauthTokenURI == "" {
auth.OauthTokenURI = config.TriggerMetadata["oauthTokenURI"]
}
if auth.Scopes == nil {
auth.Scopes = authentication.ParseScope(config.TriggerMetadata["scope"])
}
if auth.ClientID == "" {
auth.ClientID = config.TriggerMetadata["clientID"]
}
// client_secret is not required for mtls OAuth(RFC8705)
// set secret to random string to work around the Go OAuth lib
if auth.ClientSecret == "" {
auth.ClientSecret = time.Now().String()
}
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
return meta, nil
Expand All @@ -224,9 +243,19 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
return nil, fmt.Errorf("error requesting stats from admin url: %w", err)
}

client := s.client
if s.metadata.pulsarAuth.EnableOAuth {
config := clientcredentials.Config{
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
}
client = config.Client(context.Background())
}
addAuthHeaders(req, &s.metadata)

res, err := s.client.Do(req)
res, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error requesting stats from admin url: %w", err)
}
Expand Down
104 changes: 97 additions & 7 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type parsePulsarAuthParamsTestData struct {
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
}

type pulsarMetricIdentifier struct {
Expand Down Expand Up @@ -74,18 +79,33 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""},

// Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"},
// Passes, oauth config data is set from metadata only
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""},
// Passes, oauth config data is set from TriggerAuth if both provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"},
// Passes, with multiple scopes from metadata
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""},
// Passes, with multiple scopes from TriggerAuth
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"},
// Passes, no scope provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
// Passes, invalid scopes provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -176,6 +196,20 @@ func TestParsePulsarMetadata(t *testing.T) {
}
}

func compareScope(scopes []string, scopeStr string) bool {
scopeMap := make(map[string]bool)
for _, scope := range scopes {
scopeMap[scope] = true
}
scopeList := strings.Fields(scopeStr)
for _, scope := range scopeList {
if !scopeMap[scope] {
return false
}
}
return true
}

func TestPulsarAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler")
Expand Down Expand Up @@ -218,7 +252,12 @@ func TestPulsarAuthParams(t *testing.T) {
}

if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
if testData.username != "" {
t.Errorf("Expected EnableBasicAuth to be true when username is %s\n", testData.username)
}
if testData.password != "" {
t.Errorf("Expected EnableBasicAuth to be true when password is %s\n", testData.password)
}
}

if meta.pulsarAuth.Username != testData.username {
Expand All @@ -231,6 +270,57 @@ func TestPulsarAuthParams(t *testing.T) {
}
}

func TestPulsarOAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler")
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, logger)

if err != nil && !testData.isError {
t.Error("Expected success but got error", testData.authParams, err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if meta.pulsarAuth == nil {
t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData)
continue
}

if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") {
if testData.clientID != "" {
t.Errorf("Expected EnableOAuth to be true when clientID is %s\n", testData.clientID)
}
if testData.clientSecret != "" {
t.Errorf("Expected EnableOAuth to be true when clientSecret is %s\n", testData.clientSecret)
}
}

if meta.pulsarAuth.OauthTokenURI != testData.oauthTokenURI {
t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.pulsarAuth.OauthTokenURI)
}

if testData.scope != "" && !compareScope(meta.pulsarAuth.Scopes, testData.scope) {
t.Errorf("Expected scopes %s but got %s\n", testData.scope, meta.pulsarAuth.Scopes)
}
if testData.scope == "" && meta.pulsarAuth.Scopes != nil {
t.Errorf("Expected scopes to be null but got %s\n", meta.pulsarAuth.Scopes)
}

if meta.pulsarAuth.ClientID != testData.clientID {
t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID)
}

if meta.pulsarAuth.EnableOAuth && meta.pulsarAuth.ClientSecret == "" {
t.Errorf("Expected clientSecret not to be empty.\n")
}

if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 {
t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret)
}
}
}

func TestPulsarGetMetricSpecForScaling(t *testing.T) {
for _, testData := range pulsarMetricIdentifiers {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams}, "test_pulsar_scaler")
Expand Down