Skip to content

Commit

Permalink
feat: enable controller HA (#602)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Mar 13, 2023
1 parent 23d2a05 commit 4cdef17
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 7 deletions.
19 changes: 19 additions & 0 deletions config/advanced-install/namespaced-controller-wo-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ rules:
- vertices/finalizers
- vertices/status
- vertices/scale
- apiGroups:
- ""
resources:
- "events"
verbs:
- "create"
- "patch"
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down
19 changes: 19 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11489,6 +11489,25 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down
19 changes: 19 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11402,6 +11402,25 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ rules:
- vertices/finalizers
- vertices/status
- vertices/scale
- apiGroups:
- ""
resources:
- "events"
verbs:
- "create"
- "patch"
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down
13 changes: 13 additions & 0 deletions docs/operations/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ To do managed namespace installation, besides `--namespaced`, add `--managed-nam
- --managed-namespace
- my-namespace
```

# High Availability

By default, the Numaflow controller is installed with `Active-Passive` HA strategy enabled, which means you can run the controller with multiple replicas (defaults to 1 in the manifests).

To turn off HA, add following environment variable to the deployment spec.

```
name: NUMAFLOW_LEADER_ELECTION_DISABLED
value: "true"
```

If HA is turned off, the controller deployment should not run with multiple replicas.
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ const (
EnvISBSvcJetStreamURL = "NUMAFLOW_ISBSVC_JETSTREAM_URL"
EnvISBSvcJetStreamTLSEnabled = "NUMAFLOW_ISBSVC_JETSTREAM_TLS_ENABLED"
EnvISBSvcConfig = "NUMAFLOW_ISBSVC_CONFIG"
EnvLeaderElectionDisabled = "NUMAFLOW_LEADER_ELECTION_DISABLED"
EnvDebug = "NUMAFLOW_DEBUG"
EnvPPROF = "NUMAFLOW_PPROF"
EnvHealthCheckDisabled = "NUMAFLOW_HEALTH_CHECK_DISABLED"
Expand Down
31 changes: 28 additions & 3 deletions pkg/reconciler/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cmd

import (
"context"
"reflect"

numaflow "github.com/numaproj/numaflow"
Expand All @@ -37,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -58,7 +60,14 @@ func Start(namespaced bool, managedNamespace string) {
opts := ctrl.Options{
MetricsBindAddress: ":9090",
HealthProbeBindAddress: ":8081",
LeaderElection: true,
LeaderElectionID: "numaflow-controller-lock",
}

if sharedutil.LookupEnvStringOr(dfv1.EnvLeaderElectionDisabled, "false") == "true" {
opts.LeaderElection = false
}

if namespaced {
opts.Namespace = managedNamespace
}
Expand Down Expand Up @@ -184,11 +193,27 @@ func Start(namespaced bool, managedNamespace string) {
logger.Fatalw("Unable to watch Services", zap.Error(err))
}

ctx := ctrl.SetupSignalHandler()
go autoscaler.Start(logging.WithLogger(ctx, logging.NewLogger().Named("autoscaler")))
// Add autoscaling runner
if err := mgr.Add(LeaderElectionRunner(autoscaler.Start)); err != nil {
logger.Fatalw("Unable to add autoscaling runner", zap.Error(err))
}

logger.Infow("Starting controller manager", "version", numaflow.GetVersion())
if err := mgr.Start(ctx); err != nil {
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
logger.Fatalw("Unable to run controller manager", zap.Error(err))
}
}

// LeaderElectionRunner is used to convert a function to be able to run as a LeaderElectionRunnable.
type LeaderElectionRunner func(ctx context.Context) error

func (ler LeaderElectionRunner) Start(ctx context.Context) error {
return ler(ctx)
}

func (ler LeaderElectionRunner) NeedLeaderElection() bool {
return true
}

var _ manager.Runnable = (*LeaderElectionRunner)(nil)
var _ manager.LeaderElectionRunnable = (*LeaderElectionRunner)(nil)
8 changes: 4 additions & 4 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, rate
// Start function starts the autoscaling worker group.
// Each worker keeps picking up scaling tasks (which contains vertex keys) to calculate the desired replicas,
// and patch the vetex spec with the new replica number if needed.
func (s *Scaler) Start(ctx context.Context) {
log := logging.FromContext(ctx)
func (s *Scaler) Start(ctx context.Context) error {
log := logging.FromContext(ctx).Named("autoscaler")
log.Info("Starting autoscaler...")
keyCh := make(chan string)
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(logging.WithLogger(ctx, log))
defer cancel()
// Worker group
for i := 1; i <= s.options.workers; i++ {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (s *Scaler) Start(ctx context.Context) {
select {
case <-ctx.Done():
log.Info("Shutting down scaling job assigner")
return
return nil
default:
assign()
}
Expand Down
5 changes: 5 additions & 0 deletions test/manifests/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ patches:
value:
name: NUMAFLOW_DEBUG
value: "true"
- op: add
path: /spec/template/spec/containers/0/env/-
value:
name: NUMAFLOW_LEADER_ELECTION_DISABLED
value: "true"
target:
kind: Deployment
name: numaflow-controller
Expand Down

0 comments on commit 4cdef17

Please sign in to comment.