From 2040b7da78f131ea829e6d080d802407676009d3 Mon Sep 17 00:00:00 2001 From: "anton.lysina" Date: Thu, 10 Aug 2023 10:27:39 +0200 Subject: [PATCH] fix auth header for bearer authtype, add tests --- pkg/scalers/azure_pipelines_scaler.go | 105 +++--- .../azure_pipelines_aad_wi_test.go | 317 ++++++++++++++++++ 2 files changed, 374 insertions(+), 48 deletions(-) create mode 100644 tests/scalers/azure/azure_pipelines_aad_wi/azure_pipelines_aad_wi_test.go diff --git a/pkg/scalers/azure_pipelines_scaler.go b/pkg/scalers/azure_pipelines_scaler.go index fd6a689540f..b62c828277b 100644 --- a/pkg/scalers/azure_pipelines_scaler.go +++ b/pkg/scalers/azure_pipelines_scaler.go @@ -120,17 +120,17 @@ type azurePipelinesPoolIDResponse struct { } type azurePipelinesScaler struct { - metricType v2.MetricTargetType - metadata *azurePipelinesMetadata - httpClient *http.Client - logger logr.Logger + metricType v2.MetricTargetType + metadata *azurePipelinesMetadata + httpClient *http.Client + podIdentity kedav1alpha1.AuthPodIdentity + logger logr.Logger } type azurePipelinesMetadata struct { organizationURL string organizationName string personalAccessToken string - podIdentityProvider kedav1alpha1.PodIdentityProvider parent string demands string poolID int @@ -145,48 +145,45 @@ type azurePipelinesMetadata struct { func NewAzurePipelinesScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + logger := InitializeLogger(config, "azure_pipelines_scaler") metricType, err := GetMetricTargetType(config) if err != nil { return nil, fmt.Errorf("error getting scaler metric type: %w", err) } - meta, err := parseAzurePipelinesMetadata(ctx, config, httpClient) + meta, podIdentity, err := parseAzurePipelinesMetadata(ctx, logger, config, httpClient) if err != nil { return nil, fmt.Errorf("error parsing azure Pipelines metadata: %w", err) } return &azurePipelinesScaler{ - metricType: metricType, - metadata: meta, - httpClient: httpClient, - logger: InitializeLogger(config, "azure_pipelines_scaler"), + metricType: metricType, + metadata: meta, + httpClient: httpClient, + podIdentity: podIdentity, + logger: logger, }, nil } -func parseAzureDevOpsAuthMethod(config *ScalerConfig, metadata *azurePipelinesMetadata) error { - if val, ok := config.AuthParams["personalAccessToken"]; ok && val != "" { - // Found the personalAccessToken in a parameter from TriggerAuthentication - metadata.personalAccessToken = config.AuthParams["personalAccessToken"] - } else if val, ok := config.TriggerMetadata["personalAccessTokenFromEnv"]; ok && val != "" { - metadata.personalAccessToken = config.ResolvedEnv[config.TriggerMetadata["personalAccessTokenFromEnv"]] - } else if config.PodIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload { - //use workload identity - metadata.podIdentityProvider = config.PodIdentity.Provider - } else { - return fmt.Errorf("no personalAccessToken given or PodIdentity provider configured") +func getAuthPodIdentity(config *ScalerConfig) (kedav1alpha1.AuthPodIdentity, error) { + switch config.PodIdentity.Provider { + case "", kedav1alpha1.PodIdentityProviderNone: + return kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, nil + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + return kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}, nil + default: + return kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage blobs", config.PodIdentity) } - - return nil } -func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, httpClient *http.Client) (*azurePipelinesMetadata, error) { +func parseAzurePipelinesMetadata(ctx context.Context, logger logr.Logger, config *ScalerConfig, httpClient *http.Client) (*azurePipelinesMetadata, kedav1alpha1.AuthPodIdentity, error) { meta := azurePipelinesMetadata{} meta.targetPipelinesQueueLength = defaultTargetPipelinesQueueLength if val, ok := config.TriggerMetadata["targetPipelinesQueueLength"]; ok { queueLength, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing azure pipelines metadata targetPipelinesQueueLength: %w", err) + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure pipelines metadata targetPipelinesQueueLength: %w", err) } meta.targetPipelinesQueueLength = queueLength @@ -196,7 +193,7 @@ func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, http if val, ok := config.TriggerMetadata["activationTargetPipelinesQueueLength"]; ok { activationQueueLength, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing azure pipelines metadata activationTargetPipelinesQueueLength: %w", err) + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure pipelines metadata activationTargetPipelinesQueueLength: %w", err) } meta.activationTargetPipelinesQueueLength = activationQueueLength @@ -208,18 +205,27 @@ func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, http } else if val, ok := config.TriggerMetadata["organizationURLFromEnv"]; ok && val != "" { meta.organizationURL = config.ResolvedEnv[val] } else { - return nil, fmt.Errorf("no organizationURL given") + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no organizationURL given") } if val := meta.organizationURL[strings.LastIndex(meta.organizationURL, "/")+1:]; val != "" { meta.organizationName = meta.organizationURL[strings.LastIndex(meta.organizationURL, "/")+1:] } else { - return nil, fmt.Errorf("failed to extract organization name from organizationURL") + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("failed to extract organization name from organizationURL") } - err := parseAzureDevOpsAuthMethod(config, &meta) + podIdentity, err := getAuthPodIdentity(config) if err != nil { - return nil, err + return nil, kedav1alpha1.AuthPodIdentity{}, err + } + + if val, ok := config.AuthParams["personalAccessToken"]; ok && val != "" { + // Found the personalAccessToken in a parameter from TriggerAuthentication + meta.personalAccessToken = config.AuthParams["personalAccessToken"] + } else if val, ok := config.TriggerMetadata["personalAccessTokenFromEnv"]; ok && val != "" { + meta.personalAccessToken = config.ResolvedEnv[config.TriggerMetadata["personalAccessTokenFromEnv"]] + } else if podIdentity.Provider == kedav1alpha1.PodIdentityProviderNone { + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no personalAccessToken given or PodIdentity provider configured") } if val, ok := config.TriggerMetadata["parent"]; ok && val != "" { @@ -238,7 +244,7 @@ func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, http if val, ok := config.TriggerMetadata["jobsToFetch"]; ok && val != "" { jobsToFetch, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing jobsToFetch: %w", err) + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing jobsToFetch: %w", err) } meta.jobsToFetch = jobsToFetch } @@ -247,41 +253,41 @@ func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, http if val, ok := config.TriggerMetadata["requireAllDemands"]; ok && val != "" { requireAllDemands, err := strconv.ParseBool(val) if err != nil { - return nil, err + return nil, kedav1alpha1.AuthPodIdentity{}, err } meta.requireAllDemands = requireAllDemands } if val, ok := config.TriggerMetadata["poolName"]; ok && val != "" { var err error - poolID, err := getPoolIDFromName(ctx, val, &meta, httpClient) + poolID, err := getPoolIDFromName(ctx, logger, val, &meta, podIdentity, httpClient) if err != nil { - return nil, err + return nil, kedav1alpha1.AuthPodIdentity{}, err } meta.poolID = poolID } else { if val, ok := config.TriggerMetadata["poolID"]; ok && val != "" { var err error - poolID, err := validatePoolID(ctx, val, &meta, httpClient) + poolID, err := validatePoolID(ctx, logger, val, &meta, podIdentity, httpClient) if err != nil { - return nil, err + return nil, kedav1alpha1.AuthPodIdentity{}, err } meta.poolID = poolID } else { - return nil, fmt.Errorf("no poolName or poolID given") + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no poolName or poolID given") } } - // Trim any trailing new lines from the Azure Pipelines PAT + // // Trim any trailing new lines from the Azure Pipelines PAT meta.personalAccessToken = strings.TrimSuffix(meta.personalAccessToken, "\n") meta.scalerIndex = config.ScalerIndex - return &meta, nil + return &meta, podIdentity, nil } -func getPoolIDFromName(ctx context.Context, poolName string, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) { +func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) (int, error) { url := fmt.Sprintf("%s/_apis/distributedtask/pools?poolName=%s", metadata.organizationURL, poolName) - body, err := getAzurePipelineRequest(ctx, url, metadata, httpClient) + body, err := getAzurePipelineRequest(ctx, logger, url, metadata, podIdentity, httpClient) if err != nil { return -1, err } @@ -304,9 +310,9 @@ func getPoolIDFromName(ctx context.Context, poolName string, metadata *azurePipe return result.Value[0].ID, nil } -func validatePoolID(ctx context.Context, poolID string, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) { +func validatePoolID(ctx context.Context, logger logr.Logger, poolID string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) (int, error) { url := fmt.Sprintf("%s/_apis/distributedtask/pools?poolID=%s", metadata.organizationURL, poolID) - body, err := getAzurePipelineRequest(ctx, url, metadata, httpClient) + body, err := getAzurePipelineRequest(ctx, logger, url, metadata, podIdentity, httpClient) if err != nil { return -1, fmt.Errorf("agent pool with id `%s` not found: %w", poolID, err) } @@ -320,24 +326,27 @@ func validatePoolID(ctx context.Context, poolID string, metadata *azurePipelines return result.ID, nil } -func getAzurePipelineRequest(ctx context.Context, url string, metadata *azurePipelinesMetadata, httpClient *http.Client) ([]byte, error) { +func getAzurePipelineRequest(ctx context.Context, logger logr.Logger, url string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) ([]byte, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return []byte{}, err } - switch metadata.podIdentityProvider { + switch podIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: //PAT + logger.V(0).Info("making request to ADO REST API using PAT") req.SetBasicAuth("", metadata.personalAccessToken) case kedav1alpha1.PodIdentityProviderAzureWorkload: //ADO Resource token resource := "499b84ac-1321-427f-aa17-267ca6975798" + logger.V(0).Info("making request to ADO REST API using managed identity") aadToken, err := azure.GetAzureADWorkloadIdentityToken(ctx, "", resource) if err != nil { - return []byte{}, fmt.Errorf("cannot create workload identity credentials: %s", err.Error()) + return []byte{}, fmt.Errorf("cannot create workload identity credentials: %w", err) } - req.Header.Set("Authentication", "Bearer "+aadToken.AccessToken) + logger.V(0).Info("token acquired setting auth header as 'bearer XXXXXX'") + req.Header.Set("Authorization", "Bearer "+aadToken.AccessToken) } r, err := httpClient.Do(req) @@ -366,7 +375,7 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context) } else { url = fmt.Sprintf("%s/_apis/distributedtask/pools/%d/jobrequests?$top=%d", s.metadata.organizationURL, s.metadata.poolID, s.metadata.jobsToFetch) } - body, err := getAzurePipelineRequest(ctx, url, s.metadata, s.httpClient) + body, err := getAzurePipelineRequest(ctx, s.logger, url, s.metadata, s.podIdentity, s.httpClient) if err != nil { return -1, err } diff --git a/tests/scalers/azure/azure_pipelines_aad_wi/azure_pipelines_aad_wi_test.go b/tests/scalers/azure/azure_pipelines_aad_wi/azure_pipelines_aad_wi_test.go new file mode 100644 index 00000000000..8252fc9e9a3 --- /dev/null +++ b/tests/scalers/azure/azure_pipelines_aad_wi/azure_pipelines_aad_wi_test.go @@ -0,0 +1,317 @@ +//go:build e2e +// +build e2e + +package azure_pipelines_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/joho/godotenv" + "github.com/microsoft/azure-devops-go-api/azuredevops" + "github.com/microsoft/azure-devops-go-api/azuredevops/build" + "github.com/microsoft/azure-devops-go-api/azuredevops/taskagent" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "azure-pipelines-test" +) + +var ( + organizationURL = os.Getenv("AZURE_DEVOPS_ORGANIZATION_URL") + personalAccessToken = os.Getenv("AZURE_DEVOPS_PAT") + project = os.Getenv("AZURE_DEVOPS_PROJECT") + buildID = os.Getenv("AZURE_DEVOPS_BUILD_DEFINITION_ID") + poolName = os.Getenv("AZURE_DEVOPS_POOL_NAME") + poolID = "0" + triggerAuthName = fmt.Sprintf("%s-ta", testName) + testNamespace = fmt.Sprintf("%s-ns", testName) + secretName = fmt.Sprintf("%s-secret", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + minReplicaCount = 0 + maxReplicaCount = 1 +) + +type templateData struct { + TestNamespace string + SecretName string + DeploymentName string + ScaledObjectName string + MinReplicaCount string + MaxReplicaCount string + Pat string + URL string + PoolName string + PoolID string + TriggerAuthName string +} + +const ( + secretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + personalAccessToken: {{.Pat}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: azdevops-agent +spec: + replicas: 1 + selector: + matchLabels: + app: azdevops-agent + template: + metadata: + labels: + app: azdevops-agent + spec: + terminationGracePeriodSeconds: 90 + containers: + - name: azdevops-agent + lifecycle: + preStop: + exec: + command: ["/bin/sleep","60"] + image: ghcr.io/kedacore/tests-azure-pipelines-agent:b3a02cc + env: + - name: AZP_URL + value: {{.URL}} + - name: AZP_TOKEN + valueFrom: + secretKeyRef: + name: {{.SecretName}} + key: personalAccessToken + - name: AZP_POOL + value: {{.PoolName}} +` + + poolIdscaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + pollingInterval: 15 + cooldownPeriod: 5 + triggers: + - type: azure-pipelines + metadata: + organizationURLFromEnv: "AZP_URL" + activationTargetPipelinesQueueLength: "1" + poolID: "{{.PoolID}}" + authenticationRef: + name: {{.TriggerAuthName}} +` + poolNamescaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + pollingInterval: 15 + cooldownPeriod: 5 + triggers: + - type: azure-pipelines + metadata: + organizationURLFromEnv: "AZP_URL" + activationTargetPipelinesQueueLength: "1" + poolName: "{{.PoolName}}" + authenticationRef: + name: {{.TriggerAuthName}} +` + poolTriggerAuthRef = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: azure-workload +` +) + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + require.NotEmpty(t, organizationURL, "AZURE_DEVOPS_ORGANIZATION_URL env variable is required for azure blob test") + require.NotEmpty(t, personalAccessToken, "AZURE_DEVOPS_PAT env variable is required for azure blob test") + require.NotEmpty(t, project, "AZURE_DEVOPS_PROJECT env variable is required for azure blob test") + require.NotEmpty(t, buildID, "AZURE_DEVOPS_BUILD_DEFINITION_ID env variable is required for azure blob test") + require.NotEmpty(t, poolName, "AZURE_DEVOPS_POOL_NAME env variable is required for azure blob test") + connection := azuredevops.NewPatConnection(organizationURL, personalAccessToken) + clearAllBuilds(t, connection) + // Get pool ID + poolID = fmt.Sprintf("%d", getAzDoPoolID(t, connection)) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + WaitForPodCountInNamespace(t, kc, testNamespace, minReplicaCount, 60, 2) + + // test scaling poolId + testActivation(t, kc, connection) + testScaleOut(t, kc, connection) + testScaleIn(t, kc) + + // test scaling PoolName + KubectlApplyWithTemplate(t, data, "poolNamescaledObjectTemplate", poolNamescaledObjectTemplate) + testActivation(t, kc, connection) + testScaleOut(t, kc, connection) + testScaleIn(t, kc) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func getAzDoPoolID(t *testing.T, connection *azuredevops.Connection) int { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + taskClient, err := taskagent.NewClient(ctx, connection) + if err != nil { + t.Errorf("unable to create task agent client") + } + args := taskagent.GetAgentPoolsArgs{ + PoolName: &poolName, + } + pools, err := taskClient.GetAgentPools(ctx, args) + if err != nil { + t.Errorf("unable to get the pools") + } + return *(*pools)[0].Id +} + +func queueBuild(t *testing.T, connection *azuredevops.Connection) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + buildClient, err := build.NewClient(ctx, connection) + if err != nil { + t.Errorf("unable to create build client") + } + id, err := strconv.Atoi(buildID) + if err != nil { + t.Errorf("unable to parse buildID") + } + args := build.QueueBuildArgs{ + Project: &project, + Build: &build.Build{ + Definition: &build.DefinitionReference{ + Id: &id, + }, + }, + } + _, err = buildClient.QueueBuild(ctx, args) + if err != nil { + t.Errorf("unable to get the pools") + } +} + +func clearAllBuilds(t *testing.T, connection *azuredevops.Connection) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + buildClient, err := build.NewClient(ctx, connection) + if err != nil { + t.Errorf("unable to create build client") + } + var top = 20 + args := build.GetBuildsArgs{ + Project: &project, + StatusFilter: &build.BuildStatusValues.All, + QueryOrder: &build.BuildQueryOrderValues.QueueTimeDescending, + Top: &top, + } + azBuilds, err := buildClient.GetBuilds(ctx, args) + if err != nil { + t.Errorf("unable to get builds") + } + for _, azBuild := range azBuilds.Value { + azBuild.Status = &build.BuildStatusValues.Cancelling + args := build.UpdateBuildArgs{ + Build: &azBuild, + Project: &project, + BuildId: azBuild.Id, + } + _, err = buildClient.UpdateBuild(ctx, args) + if err != nil { + t.Errorf("unable to cancel build") + } + } +} + +func getTemplateData() (templateData, []Template) { + base64Pat := base64.StdEncoding.EncodeToString([]byte(personalAccessToken)) + + return templateData{ + TestNamespace: testNamespace, + SecretName: secretName, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: fmt.Sprintf("%v", minReplicaCount), + MaxReplicaCount: fmt.Sprintf("%v", maxReplicaCount), + Pat: base64Pat, + URL: organizationURL, + PoolName: poolName, + PoolID: poolID, + TriggerAuthName: triggerAuthName, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "poolTriggerAuthRef", Config: poolTriggerAuthRef}, + {Name: "poolIdscaledObjectTemplate", Config: poolIdscaledObjectTemplate}, + } +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, connection *azuredevops.Connection) { + t.Log("--- testing activation ---") + queueBuild(t, connection) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, connection *azuredevops.Connection) { + t.Log("--- testing scale out ---") + queueBuild(t, connection) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 1), + "replica count should be 2 after 1 minute") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + assert.True(t, WaitForPodCountInNamespace(t, kc, testNamespace, minReplicaCount, 60, 5), + "pod count should be 0 after 1 minute") +}