Skip to content

Commit

Permalink
chore: cherry pick commits for release v2.11.2 (#4826)
Browse files Browse the repository at this point in the history
Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Co-authored-by: Dao Thanh Tung <ttdao.2015@accountancy.smu.edu.sg>
Co-authored-by: Eldarrin <32762846+Eldarrin@users.noreply.github.com>
fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` (#4736)
fix: AWS Pod Identity uses default service account if not provided (#4768)
fix: Restore prometheus metrics in Metrics Server (#4766)
fix repo bug (#4792)
  • Loading branch information
JorTurFer committed Jul 27, 2023
1 parent b8dbd29 commit 517ed30
Show file tree
Hide file tree
Showing 17 changed files with 224 additions and 142 deletions.
32 changes: 29 additions & 3 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
## History

- [Unreleased](#unreleased)
- [v2.11.2](#v2110)
- [v2.11.1](#v2110)
- [v2.11.0](#v2110)
- [v2.10.1](#v2101)
- [v2.10.0](#v2100)
Expand Down Expand Up @@ -55,9 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- **General**: Paused ScaledObject continues working after removing the annotation ([#4733](https://github.com/kedacore/keda/issues/4733))
- **General**: Skip resolving secrets if namespace is restricted ([#4519](https://github.com/kedacore/keda/issues/4519))
- **Prometheus**: Authenticated connections to Prometheus work in non-PodIdenty case ([#4695](https://github.com/kedacore/keda/issues/4695))
- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))

### Deprecations

Expand All @@ -75,6 +75,32 @@ New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))

## v2.11.2

### Fixes

- **General**: Metrics server exposes Prometheus metrics ([#4776](https://github.com/kedacore/keda/issues/4776))
- **AWS Pod Identity Authentication**: Use `default` service account if the workload doesn't set it ([#4767](https://github.com/kedacore/keda/issues/4767))
- **GitHub Runner Scaler**: Fix rate checking on GHEC when HTTP 200 ([#4786](https://github.com/kedacore/keda/issues/4786))
- **GitHub Runner Scaler**: Fix explicit repo check 404 to skip not crash ([#4790](https://github.com/kedacore/keda/issues/4790))
- **Pulsar Scaler**: Fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` ([#4681](https://github.com/kedacore/keda/issues/4681))

### Deprecations

You can find all deprecations in [this overview](https://github.com/kedacore/keda/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3Abreaking-change) and [join the discussion here](https://github.com/kedacore/keda/discussions/categories/deprecations).

New deprecation(s):

- **Pulsar Scaler**: Fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` ([#4681](https://github.com/kedacore/keda/issues/4681))

## v2.11.1

### Fixes

- **General**: Paused ScaledObject continues working after removing the annotation ([#4733](https://github.com/kedacore/keda/issues/4733))
- **General**: Skip resolving secrets if namespace is restricted ([#4519](https://github.com/kedacore/keda/issues/4519))
- **Prometheus**: Authenticated connections to Prometheus work in non-PodIdenty case ([#4695](https://github.com/kedacore/keda/issues/4695))

## v2.11.0

### New
Expand Down
72 changes: 22 additions & 50 deletions cmd/adapter/main.go
Expand Up @@ -21,15 +21,9 @@ import (
"flag"
"fmt"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -40,7 +34,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -62,44 +55,44 @@ var (
metricsServiceAddr string
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.ExternalMetricsProvider, error) {
func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsProvider, <-chan struct{}, error) {
scheme := scheme.Scheme
if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add apps/v1 scheme to runtime scheme")
return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
return nil, nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
}
if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add keda scheme to runtime scheme")
return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
return nil, nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
}
namespace, err := getWatchNamespace()
if err != nil {
logger.Error(err, "failed to get watch namespace")
return nil, fmt.Errorf("failed to get watch namespace (%s)", err)
return nil, nil, fmt.Errorf("failed to get watch namespace (%s)", err)
}

leaseDuration, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
if err != nil {
logger.Error(err, "invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
}

renewDeadline, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
}

retryPeriod, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
}

useMetricsServiceGrpc, err := kedautil.ResolveOsEnvBool("KEDA_USE_METRICS_SERVICE_GRPC", true)
if err != nil {
logger.Error(err, "Invalid KEDA_USE_METRICS_SERVICE_GRPC")
return nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
}

// Get a config to talk to the apiserver
Expand All @@ -121,38 +114,24 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
})
if err != nil {
logger.Error(err, "failed to setup manager")
return nil, err
return nil, nil, err
}

broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Unable to create kube clientset")
return nil, err
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, err
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

logger.Info("Connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr, a.SecureServing.ServerCert.CertDirectory)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
return nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), nil
return nil, nil, err
}
stopCh := make(chan struct{})
go func() {
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "controller-runtime encountered an error")
stopCh <- struct{}{}
close(stopCh)
}
}()
return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), stopCh, nil
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
Expand Down Expand Up @@ -217,27 +196,20 @@ func main() {

ctrl.SetLogger(logger)

// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

err = printWelcomeMsg(cmd)
if err != nil {
return
}

kedaProvider, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
kedaProvider, stopCh, err := cmd.makeProvider(ctx)
if err != nil {
logger.Error(err, "making provider")
return
}
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)
if err = cmd.Run(wait.NeverStop); err != nil {
if err = cmd.Run(stopCh); err != nil {
return
}
}
4 changes: 2 additions & 2 deletions config/metrics-server/service.yaml
Expand Up @@ -13,8 +13,8 @@ spec:
- name: https
port: 443
targetPort: 6443
- name: http
port: 80
- name: metrics
port: 8080
targetPort: 8080
selector:
app: keda-metrics-apiserver
5 changes: 1 addition & 4 deletions pkg/provider/provider.go
Expand Up @@ -29,15 +29,13 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/scaling"
)

// KedaProvider implements External Metrics Provider
type KedaProvider struct {
defaults.DefaultExternalMetricsProvider

client client.Client
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context

Expand All @@ -52,10 +50,9 @@ var (
)

// NewProvider returns an instance of KedaProvider
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, grpcClient metricsservice.GrpcClient, useMetricsServiceGrpc bool, watchedNamespace string) provider.ExternalMetricsProvider {
func NewProvider(ctx context.Context, adapterLogger logr.Logger, client client.Client, grpcClient metricsservice.GrpcClient, useMetricsServiceGrpc bool, watchedNamespace string) provider.ExternalMetricsProvider {
provider := &KedaProvider{
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
grpcClient: grpcClient,
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/authentication/authentication_types.go
Expand Up @@ -6,15 +6,15 @@ import "time"
type Type string

const (
// APIKeyAuthType is a auth type using an API key
// APIKeyAuthType is an auth type using an API key
APIKeyAuthType Type = "apiKey"
// BasicAuthType is a auth type using basic auth
// BasicAuthType is an auth type using basic auth
BasicAuthType Type = "basic"
// TLSAuthType is a auth type using TLS
// TLSAuthType is an auth type using TLS
TLSAuthType Type = "tls"
// BearerAuthType is a auth type using a bearer token
// BearerAuthType is an auth type using a bearer token
BearerAuthType Type = "bearer"
// CustomAuthType is a auth type using a custom header
// CustomAuthType is an auth type using a custom header
CustomAuthType Type = "custom"
)

Expand Down
40 changes: 23 additions & 17 deletions pkg/scalers/github_runner_scaler.go
Expand Up @@ -479,7 +479,7 @@ func (s *githubRunnerScaler) getRepositories(ctx context.Context) ([]string, err
default:
return nil, fmt.Errorf("runnerScope %s not supported", s.metadata.runnerScope)
}
body, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
body, _, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
return nil, err
}
Expand All @@ -498,10 +498,10 @@ func (s *githubRunnerScaler) getRepositories(ctx context.Context) ([]string, err
return repoList, nil
}

func getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMetadata, httpClient *http.Client) ([]byte, error) {
func getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMetadata, httpClient *http.Client) ([]byte, int, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return []byte{}, err
return []byte{}, -1, err
}

req.Header.Set("Accept", "application/vnd.github.v3+json")
Expand All @@ -513,27 +513,29 @@ func getGithubRequest(ctx context.Context, url string, metadata *githubRunnerMet

r, err := httpClient.Do(req)
if err != nil {
return []byte{}, err
return []byte{}, -1, err
}

b, err := io.ReadAll(r.Body)
if err != nil {
return []byte{}, err
return []byte{}, -1, err
}
_ = r.Body.Close()

if r.StatusCode != 200 || r.Header.Get("X-RateLimit-Remaining") == "" {
return []byte{}, fmt.Errorf("the GitHub REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
}
if r.StatusCode != 200 {
if r.Header.Get("X-RateLimit-Remaining") != "" {
githubAPIRemaining, _ := strconv.Atoi(r.Header.Get("X-RateLimit-Remaining"))

githubAPIRemaining, _ := strconv.Atoi(r.Header.Get("X-RateLimit-Remaining"))
if githubAPIRemaining == 0 {
resetTime, _ := strconv.ParseInt(r.Header.Get("X-RateLimit-Reset"), 10, 64)
return []byte{}, r.StatusCode, fmt.Errorf("GitHub API rate limit exceeded, resets at %s", time.Unix(resetTime, 0))
}
}

if githubAPIRemaining == 0 {
resetTime, _ := strconv.ParseInt(r.Header.Get("X-RateLimit-Reset"), 10, 64)
return []byte{}, fmt.Errorf("GitHub API rate limit exceeded, resets at %s", time.Unix(resetTime, 0))
return []byte{}, r.StatusCode, fmt.Errorf("the GitHub REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
}

return b, nil
return b, r.StatusCode, nil
}

func stripDeadRuns(allWfrs []WorkflowRuns) []WorkflowRun {
Expand All @@ -551,7 +553,7 @@ func stripDeadRuns(allWfrs []WorkflowRuns) []WorkflowRun {
// getWorkflowRunJobs returns a list of jobs for a given workflow run
func (s *githubRunnerScaler) getWorkflowRunJobs(ctx context.Context, workflowRunID int64, repoName string) ([]Job, error) {
url := fmt.Sprintf("%s/repos/%s/%s/actions/runs/%d/jobs", s.metadata.githubAPIURL, s.metadata.owner, repoName, workflowRunID)
body, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
body, _, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
return nil, err
}
Expand All @@ -568,8 +570,10 @@ func (s *githubRunnerScaler) getWorkflowRunJobs(ctx context.Context, workflowRun
// getWorkflowRuns returns a list of workflow runs for a given repository
func (s *githubRunnerScaler) getWorkflowRuns(ctx context.Context, repoName string) (*WorkflowRuns, error) {
url := fmt.Sprintf("%s/repos/%s/%s/actions/runs", s.metadata.githubAPIURL, s.metadata.owner, repoName)
body, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
body, statusCode, err := getGithubRequest(ctx, url, s.metadata, s.httpClient)
if err != nil && statusCode == 404 {
return nil, nil
} else if err != nil {
return nil, err
}

Expand Down Expand Up @@ -618,7 +622,9 @@ func (s *githubRunnerScaler) GetWorkflowQueueLength(ctx context.Context) (int64,
if err != nil {
return -1, err
}
allWfrs = append(allWfrs, *wfrs)
if wfrs != nil {
allWfrs = append(allWfrs, *wfrs)
}
}

var queueCount int64
Expand Down

0 comments on commit 517ed30

Please sign in to comment.