Skip to content

Commit

Permalink
support endpointParams in pulsar oauth (kedacore#5073)
Browse files Browse the repository at this point in the history
Signed-off-by: Zichao Qi <qizichao@deepmirror.com>
Signed-off-by: anton.lysina <alysina@gmail.com>
  • Loading branch information
qizichao-dm authored and toniiiik committed Jan 15, 2024
1 parent ab2a76e commit 6cba85e
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997))
- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))
- **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069))

### Fixes

Expand Down
19 changes: 19 additions & 0 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"

Expand Down Expand Up @@ -97,6 +98,12 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
out.Scopes = ParseScope(authParams["scope"])
out.ClientID = authParams["clientID"]
out.ClientSecret = authParams["clientSecret"]

v, err := ParseEndpointParams(authParams["endpointParams"])
if err != nil {
return nil, fmt.Errorf("incorrect value for endpointParams is given: %s", authParams["endpointParams"])
}
out.EndpointParams = v
default:
return nil, fmt.Errorf("incorrect value for authMode is given: %s", t)
}
Expand Down Expand Up @@ -130,6 +137,18 @@ func ParseScope(inputStr string) []string {
return nil
}

// ParseEndpointParams parse OAuth endpoint params from URL-encoded query string.
func ParseEndpointParams(inputStr string) (url.Values, error) {
v, err := url.ParseQuery(inputStr)
if err != nil {
return nil, err
}
if len(v) == 0 {
return nil, nil
}
return v, nil
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package authentication

import "time"
import (
"net/url"
"time"
)

// Type describes the authentication type used in a scaler
type Type string
Expand Down Expand Up @@ -45,11 +48,12 @@ type AuthMeta struct {
CA string

// oAuth2
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string
EndpointParams url.Values

// custom auth header
EnableCustomAuth bool
Expand Down
16 changes: 12 additions & 4 deletions pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada
if auth.ClientSecret == "" {
auth.ClientSecret = time.Now().String()
}
if auth.EndpointParams == nil {
v, err := authentication.ParseEndpointParams(config.TriggerMetadata["EndpointParams"])
if err != nil {
return meta, fmt.Errorf("error parsing EndpointParams: %s", config.TriggerMetadata["EndpointParams"])
}
auth.EndpointParams = v
}
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
Expand All @@ -246,10 +253,11 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
client := s.client
if s.metadata.pulsarAuth.EnableOAuth {
config := clientcredentials.Config{
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
EndpointParams: s.metadata.pulsarAuth.EndpointParams,
}
client = config.Client(context.Background())
}
Expand Down
65 changes: 37 additions & 28 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
Expand All @@ -21,21 +22,23 @@ type parsePulsarMetadataTestData struct {
}

type parsePulsarAuthParamsTestData struct {
triggerMetadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
triggerMetadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
endpointParams string
expectedEndpointParams map[string][]string
}

type pulsarMetricIdentifier struct {
Expand Down Expand Up @@ -79,33 +82,35 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", "", "", nil},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", "", "", nil},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", "", "", nil},

// Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123", "", nil},
// Passes, oauth config data is set from metadata only
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", "", "", nil},
// Passes, oauth config data is set from TriggerAuth if both provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123", "", nil},
// Passes, with multiple scopes from metadata
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", "", "", nil},
// Passes, with multiple scopes from TriggerAuth
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123", "", nil},
// Passes, no scope provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil},
// Passes, invalid scopes provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil},
// Passes, with audience provided in endpointParams
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "audience=abc", map[string][]string{"audience": {"abc"}}},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -318,6 +323,10 @@ func TestPulsarOAuthParams(t *testing.T) {
if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 {
t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret)
}

if reflect.DeepEqual(testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams) {
t.Errorf("Expected endpointParams %s but got %s\n", testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams)
}
}
}

Expand Down

0 comments on commit 6cba85e

Please sign in to comment.