Skip to content

Commit

Permalink
Allowing cluster-global operation (kedacore#269)
Browse files Browse the repository at this point in the history
* Allowing cluster-global operation

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* passing less unnecessary data in the operator

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* setting namespace

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* updating tests:

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* Updating deployment cache interfaces

To accommodate multi-namespace gets and watches

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* starting on deployment cache informer

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* switching over to informer-based deployment cache

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* fixing test compile errors and merging fake and in-memory deployment caches

Signed-off-by: Aaron <aaron@ecomaz.net>

* fixing tests

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* fixing more compile errs

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* improving logging, and several other small changes

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* go mod tidy

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* Adding config validation

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* improving ns and svc name var names to indicate interceptor

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* running go mod tidy

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* removing unused code

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* passing a function to transform target to in-cluster URL

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* not requiring namespace for operator

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* splitting namespace config for operator into watch and current

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* removing appInfo, passing current namespace everywhere in its place

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* using proper namespace when creating scaled object

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* allowing xkcd chart to set ingress namespace

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* printing namespace in error

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* using proper fully-qualified hostname of external scaler in scaledobject

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* adding note on cluster-global vs. namespaced mode

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* adding note about installing the xkcd chart in cluster-global mode

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* fixing hostname test

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* merging scaler queue counts with routing table hosts. removing merge functionality from interceptors

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* fix assumption in tests that queue has all hosts from routing table

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* adding test for MergeCountsWithRoutingTable

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* sleep for longer to wait for server to start

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* adding handler test for merging hosts

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* adding test to GetMetrics test cases for host not in queue pinger

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>
  • Loading branch information
arschles committed Jan 13, 2022
1 parent 99b0e9a commit eb4e1e9
Show file tree
Hide file tree
Showing 46 changed files with 913 additions and 1,555 deletions.
12 changes: 10 additions & 2 deletions docs/install.md
Expand Up @@ -6,8 +6,13 @@ The HTTP Add On is highly modular and, as expected, builds on top of KEDA core.
- **Scaler** - communicates scaling-related metrics to KEDA. By default, the operator will install this for you as necessary.
- **Interceptor** - a cluster-internal proxy that proxies incoming HTTP requests, communicating HTTP queue size metrics to the scaler, and holding requests in a temporary request queue when there are not yet any available app `Pod`s ready to serve. By default, the operator will install this for you as necessary.

>There is [pending work in KEDA](https://github.com/kedacore/keda/issues/615) that will eventually make this component optional. See [issue #6 in this repository](https://github.com/kedacore/http-add-on/issues/6) for even more background
>There is [pending work](https://github.com/kedacore/http-add-on/issues/354) that may eventually make this component optional.
## Before You Start: Cluster-global vs. Namespaced installation

Both KEDA and the HTTP Addon can be installed in either cluster-global or namespaced mode. In the former case, your `ScaledObject`s and `HTTPScaledObject`s (respectively) can be installed in any namespace, and one installation will detect and process it. In the latter case, you must install your `ScaledObject`s and `HTTPScaledObject`s in a specific namespace.

You have the option of installing KEDA and the HTTP Addon in either mode, but if you install one as cluster-global, the other must also be cluster-global. Similarly, if you install one as namespaced, the also must also be namespaced in the same namespace.
## Installing KEDA

Before you install any of these components, you need to install KEDA. Below are simplified instructions for doing so with [Helm](https://helm.sh), but if you need anything more customized, please see the [official KEDA deployment documentation](https://keda.sh/docs/2.0/deploy/). If you need to install Helm, refer to the [installation guide](https://helm.sh/docs/intro/install/).
Expand All @@ -17,16 +22,19 @@ Before you install any of these components, you need to install KEDA. Below are
```console
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda --namespace ${NAMESPACE} --set watchNamespace=${NAMESPACE} --create-namespace
helm install keda kedacore/keda --namespace ${NAMESPACE} --create-namespace
```

>The above command installs KEDA in cluster-global mode. Add `--set watchNamespace=<target namespace>` to install KEDA in namespaced mode.
## Install via Helm Chart

The Helm chart for this project is within KEDA's default helm repository at [kedacore/charts](http://github.com/kedacore/charts), you can install it by running:

```console
helm install http-add-on kedacore/keda-add-ons-http --namespace ${NAMESPACE}
```
>The above command installed the HTTP Addon in cluster-global mode. Add `--set operator.watchNamespace=<target namespace>` to install the HTTP Addon in namepaced mode. If you do this, you must also install KEDA in namespaced mode and use the same target namespace.
>Installing the HTTP add on won't affect any running workloads in your cluster. You'll need to install an `HTTPScaledObject` for each individual `Deployment` you want to scale. For more on how to do that, please see the [walkthrough](./walkthrough.md).
Expand Down
2 changes: 2 additions & 0 deletions docs/walkthrough.md
Expand Up @@ -16,6 +16,8 @@ helm install xkcd ./examples/xkcd -n ${NAMESPACE}

You'll need to clone the repository to get access to this chart. If you have your own `Deployment` and `Service` installed, you can go right to creating an `HTTPScaledObject` in the next section.

>If you are running KEDA and the HTTP Addon in cluster-global mode, you can install the XKCD chart in any namespace you choose. If you do so, make sure you add `--set ingressNamespace=${NAMESPACE}` to the above installation command.
>To remove the app, run `helm delete xkcd -n ${NAMESPACE}`
## Creating an `HTTPScaledObject`
Expand Down
1 change: 1 addition & 0 deletions examples/xkcd/templates/ingress.yaml
Expand Up @@ -2,6 +2,7 @@ apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ include "xkcd.fullname" . }}
namespace: {{ .Values.ingressNamespace | default .Release.Namespace }}
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
kubernetes.io/ingress.class: nginx
Expand Down
5 changes: 5 additions & 0 deletions examples/xkcd/values.yaml
@@ -1,5 +1,10 @@
replicaCount: 1
host: myhost.com
# This is the namespace that the ingress should be installed
# into. It should be set to the same namespace as the
# KEDA HTTP componentry is installed in. Defaults to the Helm
# chart release namespace
ingressNamespace:
image:
repository: arschles/xkcd
pullPolicy: Always
Expand Down
1 change: 0 additions & 1 deletion go.mod
Expand Up @@ -8,7 +8,6 @@ require (
github.com/golang/protobuf v1.5.2
github.com/kelseyhightower/envconfig v1.4.0
github.com/magefile/mage v1.12.1
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.17.0
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -286,8 +286,6 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down
2 changes: 1 addition & 1 deletion interceptor/config/timeouts.go
Expand Up @@ -48,7 +48,7 @@ func (t *Timeouts) Backoff(factor, jitter float64, steps int) wait.Backoff {

// DefaultBackoff calls t.Backoff with reasonable defaults and returns
// the result
func (t *Timeouts) DefaultBackoff() wait.Backoff {
func (t Timeouts) DefaultBackoff() wait.Backoff {
return t.Backoff(2, 0.5, 5)
}

Expand Down
19 changes: 19 additions & 0 deletions interceptor/config/validate.go
@@ -0,0 +1,19 @@
package config

import (
"fmt"
"time"
)

func Validate(srvCfg Serving, timeoutsCfg Timeouts) error {
deplCachePollInterval := time.Duration(srvCfg.DeploymentCachePollIntervalMS) * time.Millisecond
if timeoutsCfg.DeploymentReplicas < deplCachePollInterval {
return fmt.Errorf(
"deployment replicas timeout (%s) should not be less than the Deployment Cache Poll Interval (%s)",
timeoutsCfg.DeploymentReplicas,
deplCachePollInterval,
)

}
return nil
}
43 changes: 29 additions & 14 deletions interceptor/forward_wait_func.go
Expand Up @@ -3,41 +3,56 @@ package main
import (
"context"
"fmt"
"log"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
appsv1 "k8s.io/api/apps/v1"
)

type forwardWaitFunc func(context.Context, string) error
// forwardWaitFunc is a function that waits for a condition
// before proceeding to serve the request.
type forwardWaitFunc func(context.Context, string, string) error

func deploymentCanServe(depl appsv1.Deployment) bool {
return depl.Status.ReadyReplicas > 0
}

func newDeployReplicasForwardWaitFunc(
lggr logr.Logger,
deployCache k8s.DeploymentCache,
) forwardWaitFunc {
return func(ctx context.Context, deployName string) error {
deployment, err := deployCache.Get(deployName)
return func(ctx context.Context, deployNS, deployName string) error {
// get a watcher & its result channel before querying the
// deployment cache, to ensure we don't miss events
watcher := deployCache.Watch(deployNS, deployName)
eventCh := watcher.ResultChan()
defer watcher.Stop()

deployment, err := deployCache.Get(deployNS, deployName)
if err != nil {
// if we didn't get the initial deployment state, bail out
return fmt.Errorf("error getting state for deployment %s (%s)", deployName, err)
return fmt.Errorf(
"error getting state for deployment %s/%s (%s)",
deployNS,
deployName,
err,
)
}
// if there is 1 or more replica, we're done waiting
if deployment.Status.ReadyReplicas > 0 {
if deploymentCanServe(deployment) {
return nil
}
watcher := deployCache.Watch(deployName)
if err != nil {
return fmt.Errorf("error getting the stream of deployment changes")
}
defer watcher.Stop()
eventCh := watcher.ResultChan()

for {
select {
case event := <-eventCh:
deployment, ok := event.Object.(*appsv1.Deployment)
if !ok {
log.Println("Didn't get a deployment back in event")
lggr.Info(
"Didn't get a deployment back in event",
)
}
if deployment.Status.ReadyReplicas > 0 {
if deploymentCanServe(*deployment) {
return nil
}
case <-ctx.Done():
Expand Down
54 changes: 29 additions & 25 deletions interceptor/forward_wait_func_test.go
Expand Up @@ -5,10 +5,10 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
Expand All @@ -22,28 +22,28 @@ func TestForwardWaitFuncOneReplica(t *testing.T) {
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerDeploy"
cache := k8s.NewMemoryDeploymentCache(map[string]appsv1.Deployment{
deployName: *newDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
corev1.PullAlways,
),
})
cache := k8s.NewFakeDeploymentCache()
cache.AddDeployment(*newDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
corev1.PullAlways,
))

ctx, done := context.WithTimeout(ctx, waitFuncWait)
defer done()
group, ctx := errgroup.WithContext(ctx)

waitFunc := newDeployReplicasForwardWaitFunc(
logr.Discard(),
cache,
)

group.Go(func() error {
return waitFunc(ctx, deployName)
return waitFunc(ctx, ns, deployName)
})
r.NoError(group.Wait(), "wait function failed, but it shouldn't have")
}
Expand All @@ -66,17 +66,17 @@ func TestForwardWaitFuncNoReplicas(t *testing.T) {
corev1.PullAlways,
)
deployment.Status.ReadyReplicas = 0
cache := k8s.NewMemoryDeploymentCache(map[string]appsv1.Deployment{
deployName: *deployment,
})
cache := k8s.NewFakeDeploymentCache()
cache.AddDeployment(*deployment)

ctx, done := context.WithTimeout(ctx, waitFuncWait)
defer done()
waitFunc := newDeployReplicasForwardWaitFunc(
logr.Discard(),
cache,
)

err := waitFunc(ctx, deployName)
err := waitFunc(ctx, ns, deployName)
r.Error(err)
}

Expand All @@ -97,25 +97,29 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
corev1.PullAlways,
)
deployment.Spec.Replicas = k8s.Int32P(0)
cache := k8s.NewMemoryDeploymentCache(map[string]appsv1.Deployment{
deployName: *deployment,
})
cache := k8s.NewFakeDeploymentCache()
cache.AddDeployment(*deployment)
// create a watcher first so that the goroutine
// can later fetch it and send a message on it
cache.Watch(ns, deployName)

ctx, done := context.WithTimeout(ctx, totalWaitDur)
defer done()
waitFunc := newDeployReplicasForwardWaitFunc(
logr.Discard(),
cache,
)

// this channel will be closed immediately after the replicas were increased
replicasIncreasedCh := make(chan struct{})
go func() {
time.Sleep(totalWaitDur / 2)
cache.RWM.RLock()
defer cache.RWM.RUnlock()
watcher := cache.Watchers[deployName]
watcher := cache.GetWatcher(ns, deployName)
r.NotNil(watcher, "watcher was not found")
modifiedDeployment := deployment.DeepCopy()
modifiedDeployment.Spec.Replicas = k8s.Int32P(1)
watcher.Action(watch.Modified, modifiedDeployment)
close(replicasIncreasedCh)
}()
r.NoError(waitFunc(ctx, deployName))
r.NoError(waitFunc(ctx, ns, deployName))
done()
}
30 changes: 17 additions & 13 deletions interceptor/main.go
Expand Up @@ -34,9 +34,20 @@ func main() {
}
timeoutCfg := config.MustParseTimeouts()
servingCfg := config.MustParseServing()
if err := config.Validate(*servingCfg, *timeoutCfg); err != nil {
lggr.Error(err, "invalid configuration")
os.Exit(1)
}
ctx, ctxDone := context.WithCancel(
context.Background(),
)
lggr.Info(
"starting interceptor",
"timeoutConfig",
timeoutCfg,
"servingConfig",
servingCfg,
)

proxyPort := servingCfg.ProxyPort
adminPort := servingCfg.AdminPort
Expand All @@ -51,13 +62,10 @@ func main() {
lggr.Error(err, "creating new Kubernetes ClientSet")
os.Exit(1)
}
deployInterface := cl.AppsV1().Deployments(
servingCfg.CurrentNamespace,
)
deployCache, err := k8s.NewK8sDeploymentCache(
ctx,
deployCache := k8s.NewInformerBackedDeploymentCache(
lggr,
deployInterface,
cl,
time.Millisecond*time.Duration(servingCfg.DeploymentCachePollIntervalMS),
)
if err != nil {
lggr.Error(err, "creating new deployment cache")
Expand All @@ -66,7 +74,7 @@ func main() {

configMapsInterface := cl.CoreV1().ConfigMaps(servingCfg.CurrentNamespace)

waitFunc := newDeployReplicasForwardWaitFunc(deployCache)
waitFunc := newDeployReplicasForwardWaitFunc(lggr, deployCache)

lggr.Info("Interceptor starting")

Expand Down Expand Up @@ -101,11 +109,7 @@ func main() {
// start the deployment cache updater
errGrp.Go(func() error {
defer ctxDone()
err := deployCache.StartWatcher(
ctx,
lggr,
time.Duration(servingCfg.DeploymentCachePollIntervalMS)*time.Millisecond,
)
err := deployCache.Start(ctx)
lggr.Error(err, "deployment cache watcher failed")
return err
})
Expand All @@ -121,7 +125,6 @@ func main() {
configMapInformer,
servingCfg.CurrentNamespace,
routingTable,
q,
nil,
)
lggr.Error(err, "config map routing table updater failed")
Expand Down Expand Up @@ -246,6 +249,7 @@ func runProxyServer(
routingTable,
dialContextFunc,
waitFunc,
routing.ServiceURL,
newForwardingConfigFromTimeouts(timeouts),
),
)
Expand Down

0 comments on commit eb4e1e9

Please sign in to comment.