Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cherry pick commits for release v2.11.2 #4826

Merged
merged 7 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 29 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
## History

- [Unreleased](#unreleased)
- [v2.11.2](#v2110)
- [v2.11.1](#v2110)
- [v2.11.0](#v2110)
- [v2.10.1](#v2101)
- [v2.10.0](#v2100)
Expand Down Expand Up @@ -55,9 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- **General**: Paused ScaledObject continues working after removing the annotation ([#4733](https://github.com/kedacore/keda/issues/4733))
- **General**: Skip resolving secrets if namespace is restricted ([#4519](https://github.com/kedacore/keda/issues/4519))
- **Prometheus**: Authenticated connections to Prometheus work in non-PodIdenty case ([#4695](https://github.com/kedacore/keda/issues/4695))
- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))

### Deprecations

Expand All @@ -75,6 +75,32 @@ New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))

## v2.11.2

### Fixes

- **General**: Metrics server exposes Prometheus metrics ([#4776](https://github.com/kedacore/keda/issues/4776))
- **AWS Pod Identity Authentication**: Use `default` service account if the workload doesn't set it ([#4767](https://github.com/kedacore/keda/issues/4767))
- **GitHub Runner Scaler**: Fix rate checking on GHEC when HTTP 200 ([#4786](https://github.com/kedacore/keda/issues/4786))
- **GitHub Runner Scaler**: Fix explicit repo check 404 to skip not crash ([#4790](https://github.com/kedacore/keda/issues/4790))
- **Pulsar Scaler**: Fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` ([#4681](https://github.com/kedacore/keda/issues/4681))

### Deprecations

You can find all deprecations in [this overview](https://github.com/kedacore/keda/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3Abreaking-change) and [join the discussion here](https://github.com/kedacore/keda/discussions/categories/deprecations).

New deprecation(s):

- **Pulsar Scaler**: Fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` ([#4681](https://github.com/kedacore/keda/issues/4681))

## v2.11.1

### Fixes

- **General**: Paused ScaledObject continues working after removing the annotation ([#4733](https://github.com/kedacore/keda/issues/4733))
- **General**: Skip resolving secrets if namespace is restricted ([#4519](https://github.com/kedacore/keda/issues/4519))
- **Prometheus**: Authenticated connections to Prometheus work in non-PodIdenty case ([#4695](https://github.com/kedacore/keda/issues/4695))

## v2.11.0

### New
Expand Down
72 changes: 22 additions & 50 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@ import (
"flag"
"fmt"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -40,7 +34,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -62,44 +55,44 @@ var (
metricsServiceAddr string
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.ExternalMetricsProvider, error) {
func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsProvider, <-chan struct{}, error) {
scheme := scheme.Scheme
if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add apps/v1 scheme to runtime scheme")
return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
return nil, nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
}
if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add keda scheme to runtime scheme")
return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
return nil, nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
}
namespace, err := getWatchNamespace()
if err != nil {
logger.Error(err, "failed to get watch namespace")
return nil, fmt.Errorf("failed to get watch namespace (%s)", err)
return nil, nil, fmt.Errorf("failed to get watch namespace (%s)", err)
}

leaseDuration, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
if err != nil {
logger.Error(err, "invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
}

renewDeadline, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
}

retryPeriod, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
}

useMetricsServiceGrpc, err := kedautil.ResolveOsEnvBool("KEDA_USE_METRICS_SERVICE_GRPC", true)
if err != nil {
logger.Error(err, "Invalid KEDA_USE_METRICS_SERVICE_GRPC")
return nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
}

// Get a config to talk to the apiserver
Expand All @@ -121,38 +114,24 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
})
if err != nil {
logger.Error(err, "failed to setup manager")
return nil, err
return nil, nil, err
}

broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Unable to create kube clientset")
return nil, err
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, err
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

logger.Info("Connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr, a.SecureServing.ServerCert.CertDirectory)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
return nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), nil
return nil, nil, err
}
stopCh := make(chan struct{})
go func() {
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "controller-runtime encountered an error")
stopCh <- struct{}{}
close(stopCh)
}
}()
return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), stopCh, nil
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
Expand Down Expand Up @@ -217,27 +196,20 @@ func main() {

ctrl.SetLogger(logger)

// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

err = printWelcomeMsg(cmd)
if err != nil {
return
}

kedaProvider, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
kedaProvider, stopCh, err := cmd.makeProvider(ctx)
if err != nil {
logger.Error(err, "making provider")
return
}
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)
if err = cmd.Run(wait.NeverStop); err != nil {
if err = cmd.Run(stopCh); err != nil {
return
}
}
4 changes: 2 additions & 2 deletions config/metrics-server/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ spec:
- name: https
port: 443
targetPort: 6443
- name: http
port: 80
- name: metrics
port: 8080
targetPort: 8080
selector:
app: keda-metrics-apiserver
5 changes: 1 addition & 4 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/scaling"
)

// KedaProvider implements External Metrics Provider
type KedaProvider struct {
defaults.DefaultExternalMetricsProvider

client client.Client
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context

Expand All @@ -52,10 +50,9 @@ var (
)

// NewProvider returns an instance of KedaProvider
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, grpcClient metricsservice.GrpcClient, useMetricsServiceGrpc bool, watchedNamespace string) provider.ExternalMetricsProvider {
func NewProvider(ctx context.Context, adapterLogger logr.Logger, client client.Client, grpcClient metricsservice.GrpcClient, useMetricsServiceGrpc bool, watchedNamespace string) provider.ExternalMetricsProvider {
provider := &KedaProvider{
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
grpcClient: grpcClient,
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import "time"
type Type string

const (
// APIKeyAuthType is a auth type using an API key
// APIKeyAuthType is an auth type using an API key
APIKeyAuthType Type = "apiKey"
// BasicAuthType is a auth type using basic auth
// BasicAuthType is an auth type using basic auth
BasicAuthType Type = "basic"
// TLSAuthType is a auth type using TLS
// TLSAuthType is an auth type using TLS
TLSAuthType Type = "tls"
// BearerAuthType is a auth type using a bearer token
// BearerAuthType is an auth type using a bearer token
BearerAuthType Type = "bearer"
// CustomAuthType is a auth type using a custom header
// CustomAuthType is an auth type using a custom header
CustomAuthType Type = "custom"
)

Expand Down
40 changes: 23 additions & 17 deletions pkg/scalers/github_runner_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (s *githubRunnerScaler) getRepositories(ctx context.Context) ([]string, err
default:
return nil, fmt.Errorf("runnerScope %s not supported", s.metadata.runnerScope)
}
body, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
body, _, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
return nil, err
}
Expand All @@ -498,10 +498,10 @@ func (s *githubRunnerScaler) getRepositories(ctx context.Context) ([]string, err
return repoList, nil
}

func getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMetadata, httpClient *http.Client) ([]byte, error) {
func getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMetadata, httpClient *http.Client) ([]byte, int, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return []byte{}, err
return []byte{}, -1, err
}

req.Header.Set("Accept", "application/vnd.github.v3+json")
Expand All @@ -513,27 +513,29 @@ func getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMet

r, err := httpClient.Do(req)
if err != nil {
return []byte{}, err
return []byte{}, -1, err
}

b, err := io.ReadAll(r.Body)
if err != nil {
return []byte{}, err
return []byte{}, -1, err
}
_ = r.Body.Close()

if r.StatusCode != 200 || r.Header.Get("X-RateLimit-Remaining") == "" {
return []byte{}, fmt.Errorf("the GitHub REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
}
if r.StatusCode != 200 {
if r.Header.Get("X-RateLimit-Remaining") != "" {
githubAPIRemaining, _ := strconv.Atoi(r.Header.Get("X-RateLimit-Remaining"))

githubAPIRemaining, _ := strconv.Atoi(r.Header.Get("X-RateLimit-Remaining"))
if githubAPIRemaining == 0 {
resetTime, _ := strconv.ParseInt(r.Header.Get("X-RateLimit-Reset"), 10, 64)
return []byte{}, r.StatusCode, fmt.Errorf("GitHub API rate limit exceeded, resets at %s", time.Unix(resetTime, 0))
}
}

if githubAPIRemaining == 0 {
resetTime, _ := strconv.ParseInt(r.Header.Get("X-RateLimit-Reset"), 10, 64)
return []byte{}, fmt.Errorf("GitHub API rate limit exceeded, resets at %s", time.Unix(resetTime, 0))
return []byte{}, r.StatusCode, fmt.Errorf("the GitHub REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
}

return b, nil
return b, r.StatusCode, nil
}

func stripDeadRuns(allWfrs []WorkflowRuns) []WorkflowRun {
Expand All @@ -551,7 +553,7 @@ func stripDeadRuns(allWfrs []WorkflowRuns) []WorkflowRun {
// getWorkflowRunJobs returns a list of jobs for a given workflow run
func (s *githubRunnerScaler) getWorkflowRunJobs(ctx context.Context, workflowRunID int64, repoName string) ([]Job, error) {
url := fmt.Sprintf("%s/repos/%s/%s/actions/runs/%d/jobs", s.metadata.githubAPIURL, s.metadata.owner, repoName, workflowRunID)
body, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
body, _, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
return nil, err
}
Expand All @@ -568,8 +570,10 @@ func (s *githubRunnerScaler) getWorkflowRunJobs(ctx context.Context, workflowRun
// getWorkflowRuns returns a list of workflow runs for a given repository
func (s *githubRunnerScaler) getWorkflowRuns(ctx context.Context, repoName string) (*WorkflowRuns, error) {
url := fmt.Sprintf("%s/repos/%s/%s/actions/runs", s.metadata.githubAPIURL, s.metadata.owner, repoName)
body, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
body, statusCode, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil && statusCode == 404 {
return nil, nil
} else if err != nil {
return nil, err
}

Expand Down Expand Up @@ -618,7 +622,9 @@ func (s *githubRunnerScaler) GetWorkflowQueueLength(ctx context.Context) (int64,
if err != nil {
return -1, err
}
allWfrs = append(allWfrs, *wfrs)
if wfrs != nil {
allWfrs = append(allWfrs, *wfrs)
}
}

var queueCount int64
Expand Down