Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support endpointParams in pulsar oauth #5073

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997))
- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))
- **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069))

### Fixes

Expand Down
19 changes: 19 additions & 0 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"

Expand Down Expand Up @@ -97,6 +98,12 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
out.Scopes = ParseScope(authParams["scope"])
out.ClientID = authParams["clientID"]
out.ClientSecret = authParams["clientSecret"]

v, err := ParseEndpointParams(authParams["endpointParams"])
if err != nil {
return nil, fmt.Errorf("incorrect value for endpointParams is given: %s", authParams["endpointParams"])
}
out.EndpointParams = v
default:
return nil, fmt.Errorf("incorrect value for authMode is given: %s", t)
}
Expand Down Expand Up @@ -130,6 +137,18 @@ func ParseScope(inputStr string) []string {
return nil
}

// ParseEndpointParams parse OAuth endpoint params from URL-encoded query string.
func ParseEndpointParams(inputStr string) (url.Values, error) {
v, err := url.ParseQuery(inputStr)
if err != nil {
return nil, err
}
if len(v) == 0 {
return nil, nil
}
return v, nil
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package authentication

import "time"
import (
"net/url"
"time"
)

// Type describes the authentication type used in a scaler
type Type string
Expand Down Expand Up @@ -45,11 +48,12 @@ type AuthMeta struct {
CA string

// oAuth2
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string
EndpointParams url.Values

// custom auth header
EnableCustomAuth bool
Expand Down
16 changes: 12 additions & 4 deletions pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada
if auth.ClientSecret == "" {
auth.ClientSecret = time.Now().String()
}
if auth.EndpointParams == nil {
v, err := authentication.ParseEndpointParams(config.TriggerMetadata["EndpointParams"])
if err != nil {
return meta, fmt.Errorf("error parsing EndpointParams: %s", config.TriggerMetadata["EndpointParams"])
}
auth.EndpointParams = v
}
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
Expand All @@ -246,10 +253,11 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
client := s.client
if s.metadata.pulsarAuth.EnableOAuth {
config := clientcredentials.Config{
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
EndpointParams: s.metadata.pulsarAuth.EndpointParams,
}
client = config.Client(context.Background())
}
Expand Down
65 changes: 37 additions & 28 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
Expand All @@ -21,21 +22,23 @@ type parsePulsarMetadataTestData struct {
}

type parsePulsarAuthParamsTestData struct {
triggerMetadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
triggerMetadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
endpointParams string
expectedEndpointParams map[string][]string
}

type pulsarMetricIdentifier struct {
Expand Down Expand Up @@ -79,33 +82,35 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", "", "", nil},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", "", "", nil},

// Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123", "", nil},
// Passes, oauth config data is set from metadata only
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", "", "", nil},
// Passes, oauth config data is set from TriggerAuth if both provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123", "", nil},
// Passes, with multiple scopes from metadata
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", "", "", nil},
// Passes, with multiple scopes from TriggerAuth
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123", "", nil},
// Passes, no scope provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil},
// Passes, invalid scopes provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil},
// Passes, with audience provided in endpointParams
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "audience=abc", map[string][]string{"audience": {"abc"}}},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -318,6 +323,10 @@ func TestPulsarOAuthParams(t *testing.T) {
if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 {
t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret)
}

if reflect.DeepEqual(testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams) {
t.Errorf("Expected endpointParams %s but got %s\n", testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams)
}
}
}

Expand Down