From 8d62f14ddc383055c192793ca9e1982d45f565d0 Mon Sep 17 00:00:00 2001 From: Zichao Qi Date: Thu, 12 Oct 2023 11:09:52 +0800 Subject: [PATCH] support endpointParams in pulsar oauth Signed-off-by: Zichao Qi --- CHANGELOG.md | 1 + .../authentication/authentication_helpers.go | 20 ++++++ .../authentication/authentication_types.go | 16 +++-- pkg/scalers/pulsar_scaler.go | 8 +++ pkg/scalers/pulsar_scaler_test.go | 65 +++++++++++-------- 5 files changed, 76 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fce31a17da..a6862aa0798 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features: - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) - **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997)) - **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) +- **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069)) ### Fixes diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index e56485e659c..8e92c21f7ca 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "net/url" "strings" "time" @@ -97,6 +98,12 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet out.Scopes = ParseScope(authParams["scope"]) out.ClientID = authParams["clientID"] out.ClientSecret = authParams["clientSecret"] + + v, err := ParseEndpointParams(authParams["endpointParams"]) + if err != nil { + return nil, fmt.Errorf("incorrect value for endpointParams is given: %s", authParams["endpointParams"]) + } + out.EndpointParams = v default: return nil, fmt.Errorf("incorrect value for authMode is given: %s", t) } @@ -130,6 +137,19 @@ func ParseScope(inputStr string) []string { return nil } +// ParseEndpointParams parse OAuth endpoint params from URL-encoded query string. +func ParseEndpointParams(inputStr string) (url.Values, error) { + v, err := url.ParseQuery(inputStr) + if err != nil { + return nil, err + } + if len(v) == 0 { + return nil, nil + } else { + return v, nil + } +} + func GetBearerToken(auth *AuthMeta) string { return fmt.Sprintf("Bearer %s", auth.BearerToken) } diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index 66dc12b677c..09894572775 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -1,6 +1,9 @@ package authentication -import "time" +import ( + "net/url" + "time" +) // Type describes the authentication type used in a scaler type Type string @@ -45,11 +48,12 @@ type AuthMeta struct { CA string // oAuth2 - EnableOAuth bool - OauthTokenURI string - Scopes []string - ClientID string - ClientSecret string + EnableOAuth bool + OauthTokenURI string + Scopes []string + ClientID string + ClientSecret string + EndpointParams url.Values // custom auth header EnableCustomAuth bool diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index 4e20320a904..8400ec2f299 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -229,6 +229,13 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada if auth.ClientSecret == "" { auth.ClientSecret = time.Now().String() } + if auth.EndpointParams == nil { + v, err := authentication.ParseEndpointParams(config.TriggerMetadata["EndpointParams"]) + if err != nil { + return meta, fmt.Errorf("error parsing EndpointParams: %s", config.TriggerMetadata["EndpointParams"]) + } + auth.EndpointParams = v + } } meta.pulsarAuth = auth meta.scalerIndex = config.ScalerIndex @@ -250,6 +257,7 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { ClientSecret: s.metadata.pulsarAuth.ClientSecret, TokenURL: s.metadata.pulsarAuth.OauthTokenURI, Scopes: s.metadata.pulsarAuth.Scopes, + EndpointParams: s.metadata.pulsarAuth.EndpointParams, } client = config.Client(context.Background()) } diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 8fc953e5332..6c742bf2375 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "reflect" "testing" "github.com/go-logr/logr" @@ -21,21 +22,23 @@ type parsePulsarMetadataTestData struct { } type parsePulsarAuthParamsTestData struct { - triggerMetadata map[string]string - authParams map[string]string - isError bool - enableTLS bool - cert string - key string - ca string - bearerToken string - username string - password string - enableOAuth bool - oauthTokenURI string - scope string - clientID string - clientSecret string + triggerMetadata map[string]string + authParams map[string]string + isError bool + enableTLS bool + cert string + key string + ca string + bearerToken string + username string + password string + enableOAuth bool + oauthTokenURI string + scope string + clientID string + clientSecret string + endpointParams string + expectedEndpointParams map[string][]string } type pulsarMetricIdentifier struct { @@ -79,33 +82,35 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{ var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ // Passes, mutual TLS, no other auth (legacy "tls: enable") - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil}, // Passes, mutual TLS, no other auth (uses new way to enable tls) - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil}, // Fails, mutual TLS (legacy "tls: enable") without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil}, // Fails, mutual TLS, (uses new way to enable tls) without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil}, // Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS. // The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", "", "", nil}, // Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", "", "", nil}, // Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123", "", nil}, // Passes, oauth config data is set from metadata only - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", "", "", nil}, // Passes, oauth config data is set from TriggerAuth if both provided - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123", "", nil}, // Passes, with multiple scopes from metadata - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", "", "", nil}, // Passes, with multiple scopes from TriggerAuth - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123", "", nil}, // Passes, no scope provided - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil}, // Passes, invalid scopes provided - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil}, + // Passes, with audience provided in endpointParams. + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "audience=abc", map[string][]string{"audience": []string{"abc"}}}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ @@ -318,6 +323,10 @@ func TestPulsarOAuthParams(t *testing.T) { if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 { t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret) } + + if reflect.DeepEqual(testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams) { + t.Errorf("Expected endpointParams %s but got %s\n", testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams) + } } }