Skip to content

Commit

Permalink
Merge pull request #4243 from inteon/improved_go_routines
Browse files Browse the repository at this point in the history
Cleanup goroutine management
  • Loading branch information
jetstack-bot committed Jul 28, 2021
2 parents 3b50d78 + 78d1378 commit d062176
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 268 deletions.
11 changes: 10 additions & 1 deletion cmd/acmesolver/app/app.go
Expand Up @@ -18,6 +18,7 @@ package app

import (
"context"
"time"

"github.com/spf13/cobra"

Expand All @@ -37,9 +38,15 @@ func NewACMESolverCommand(stopCh <-chan struct{}) *cobra.Command {
rootCtx = logf.NewContext(rootCtx, nil, "acmesolver")
log := logf.FromContext(rootCtx)

completedCh := make(chan struct{})
go func() {
defer close(completedCh)
<-stopCh
if err := s.Shutdown(rootCtx); err != nil {
// allow a timeout for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := s.Shutdown(ctx); err != nil {
log.Error(err, "error shutting down acmesolver server")
}
}()
Expand All @@ -48,6 +55,8 @@ func NewACMESolverCommand(stopCh <-chan struct{}) *cobra.Command {
return err
}

<-completedCh

return nil
},
}
Expand Down
29 changes: 14 additions & 15 deletions cmd/cainjector/app/start.go
Expand Up @@ -117,15 +117,16 @@ servers and webhook servers.`,

func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: api.Scheme,
Namespace: o.Namespace,
LeaderElection: o.LeaderElect,
LeaderElectionNamespace: o.LeaderElectionNamespace,
LeaderElectionID: "cert-manager-cainjector-leader-election",
LeaseDuration: &o.LeaseDuration,
RenewDeadline: &o.RenewDeadline,
RetryPeriod: &o.RetryPeriod,
MetricsBindAddress: "0",
Scheme: api.Scheme,
Namespace: o.Namespace,
LeaderElection: o.LeaderElect,
LeaderElectionNamespace: o.LeaderElectionNamespace,
LeaderElectionID: "cert-manager-cainjector-leader-election",
LeaderElectionReleaseOnCancel: true,
LeaseDuration: &o.LeaseDuration,
RenewDeadline: &o.RenewDeadline,
RetryPeriod: &o.RetryPeriod,
MetricsBindAddress: "0",
})
if err != nil {
return fmt.Errorf("error creating manager: %v", err)
Expand All @@ -144,14 +145,12 @@ func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) er
return nil
})

// Don't launch the controllers unless we have been elected leader
<-mgr.Elected()

// Exit early if the Elected channel gets closed because we are shutting down.
select {
case <-gctx.Done():
case <-gctx.Done(): // Exit early if we are shutting down or if the manager has exited with an error
// Wait for error group to complete and return
return g.Wait()
default:
case <-mgr.Elected(): // Don't launch the controllers unless we have been elected leader
// Continue with setting up controller
}

// Retry the start up of the certificate based controller in case the
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/app/BUILD.bazel
Expand Up @@ -53,6 +53,7 @@ go_library(
"@io_k8s_sigs_gateway_api//pkg/client/clientset/versioned/scheme:go_default_library",
"@io_k8s_sigs_gateway_api//pkg/client/informers/externalversions:go_default_library",
"@io_k8s_utils//clock:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

Expand Down
209 changes: 132 additions & 77 deletions cmd/controller/app/controller.go
Expand Up @@ -18,14 +18,18 @@ package app

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"sync"
"time"

"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -63,89 +67,144 @@ const controllerAgentName = "cert-manager"
//and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629
const resyncPeriod = 10 * time.Hour

func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
rootCtx := cmdutil.ContextWithStopCh(context.Background(), stopCh)
rootCtx, cancelContext := context.WithCancel(rootCtx)
defer cancelContext()
g, rootCtx := errgroup.WithContext(rootCtx)
rootCtx = logf.NewContext(rootCtx, nil, "controller")
log := logf.FromContext(rootCtx)

ctx, kubeCfg, err := buildControllerContext(rootCtx, stopCh, opts)
ctx, kubeCfg, err := buildControllerContext(rootCtx, opts)
if err != nil {
log.Error(err, "error building controller context", "options", opts)
os.Exit(1)
return fmt.Errorf("error building controller context (options %v): %v", opts, err)
}

enabledControllers := opts.EnabledControllers()
log.Info(fmt.Sprintf("enabled controllers: %s", enabledControllers.List()))

metricsServer, err := ctx.Metrics.Start(opts.MetricsListenAddress, opts.EnablePprof)
ln, err := net.Listen("tcp", opts.MetricsListenAddress)
if err != nil {
log.Error(err, "failed to listen on prometheus address", "address", opts.MetricsListenAddress)
os.Exit(1)
return fmt.Errorf("failed to listen on prometheus address %s: %v", opts.MetricsListenAddress, err)
}
server := ctx.Metrics.NewServer(ln, opts.EnablePprof)

var wg sync.WaitGroup
run := func(_ context.Context) {
for n, fn := range controller.Known() {
log := log.WithValues("controller", n)
g.Go(func() error {
<-rootCtx.Done()
// allow a timeout for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// only run a controller if it's been enabled
if !enabledControllers.Has(n) {
log.V(logf.InfoLevel).Info("not starting controller as it's disabled")
continue
if err := server.Shutdown(ctx); err != nil {
return err
}
return nil
})
g.Go(func() error {
log.V(logf.InfoLevel).Info("starting metrics server", "address", ln.Addr())
if err := server.Serve(ln); err != http.ErrServerClosed {
return err
}
return nil
})

elected := make(chan struct{})
if opts.LeaderElect {
g.Go(func() error {
log.V(logf.InfoLevel).Info("starting leader election")
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election"))
if err != nil {
return fmt.Errorf("error creating leader election client: %v", err)
}

// don't run clusterissuers controller if scoped to a single namespace
if ctx.Namespace != "" && n == clusterissuers.ControllerName {
log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace")
continue
errorCh := make(chan error, 1)
if err := startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
close(elected)
},
OnStoppedLeading: func() {
select {
case <-rootCtx.Done():
// context was canceled, just return
return
default:
errorCh <- errors.New("leader election lost")
}
},
}); err != nil {
return err
}

wg.Add(1)
iface, err := fn(ctx)
if err != nil {
log.Error(err, "error starting controller")
os.Exit(1)
select {
case err := <-errorCh:
return err
default:
return nil
}
go func(n string, fn controller.Interface) {
defer wg.Done()
log.V(logf.InfoLevel).Info("starting controller")

workers := 5
err := fn.Run(workers, stopCh)

if err != nil {
log.Error(err, "error starting controller")
os.Exit(1)
}
}(n, iface)
}
})
} else {
close(elected)
}

log.V(logf.DebugLevel).Info("starting shared informer factories")
ctx.SharedInformerFactory.Start(stopCh)
ctx.KubeSharedInformerFactory.Start(stopCh)
ctx.GWShared.Start(stopCh)
wg.Wait()
log.V(logf.InfoLevel).Info("control loops exited")
ctx.Metrics.Shutdown(metricsServer)
os.Exit(0)
select {
case <-rootCtx.Done(): // Exit early if we are shutting down or if the errgroup has already exited with an error
// Wait for error group to complete and return
return g.Wait()
case <-elected: // Don't launch the controllers unless we have been elected leader
// Continue with setting up controller
}

if !opts.LeaderElect {
run(context.TODO())
return
for n, fn := range controller.Known() {
log := log.WithValues("controller", n)

// only run a controller if it's been enabled
if !enabledControllers.Has(n) {
log.V(logf.InfoLevel).Info("not starting controller as it's disabled")
continue
}

// don't run clusterissuers controller if scoped to a single namespace
if ctx.Namespace != "" && n == clusterissuers.ControllerName {
log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace")
continue
}

iface, err := fn(ctx)
if err != nil {
err = fmt.Errorf("error starting controller: %v", err)

cancelContext()
err2 := g.Wait() // Don't process errors, we already have an error
if err2 != nil {
return utilerrors.NewAggregate([]error{err, err2})
}
return err
}

g.Go(func() error {
log.V(logf.InfoLevel).Info("starting controller")

// TODO: make this either a constant or a command line flag
workers := 5
return iface.Run(workers, rootCtx.Done())
})
}

log.V(logf.InfoLevel).Info("starting leader election")
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election"))
log.V(logf.DebugLevel).Info("starting shared informer factories")
ctx.SharedInformerFactory.Start(rootCtx.Done())
ctx.KubeSharedInformerFactory.Start(rootCtx.Done())
ctx.GWShared.Start(rootCtx.Done())

err = g.Wait()
if err != nil {
log.Error(err, "error creating leader election client")
os.Exit(1)
return fmt.Errorf("error starting controller: %v", err)
}
log.V(logf.InfoLevel).Info("control loops exited")

startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, run)
return nil
}

func buildControllerContext(ctx context.Context, stopCh <-chan struct{}, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) {
func buildControllerContext(ctx context.Context, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) {
log := logf.FromContext(ctx, "build-context")
// Load the users Kubernetes config
kubeCfg, err := clientcmd.BuildConfigFromFlags(opts.APIServerHost, opts.Kubeconfig)
Expand Down Expand Up @@ -238,7 +297,7 @@ func buildControllerContext(ctx context.Context, stopCh <-chan struct{}, opts *o

return &controller.Context{
RootContext: ctx,
StopCh: stopCh,
StopCh: ctx.Done(),
RESTConfig: kubeCfg,
Client: cl,
CMClient: intcl,
Expand Down Expand Up @@ -283,14 +342,11 @@ func buildControllerContext(ctx context.Context, stopCh <-chan struct{}, opts *o
}, kubeCfg, nil
}

func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, run func(context.Context)) {
log := logf.FromContext(ctx, "leader-election")

func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error {
// Identity used to distinguish between multiple controller manager instances
id, err := os.Hostname()
if err != nil {
log.Error(err, "error getting hostname")
os.Exit(1)
return fmt.Errorf("error getting hostname: %v", err)
}

// Set up Multilock for leader election. This Multilock is here for the
Expand All @@ -309,24 +365,23 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l
lc,
)
if err != nil {
// We should never get here.
log.Error(err, "error creating leader election lock")
os.Exit(1)

return fmt.Errorf("error creating leader election lock: %v", err)
}

// Try and become the leader and start controller manager loops
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: ml,
LeaseDuration: opts.LeaderElectionLeaseDuration,
RenewDeadline: opts.LeaderElectionRenewDeadline,
RetryPeriod: opts.LeaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
log.V(logf.ErrorLevel).Info("leader election lost")
os.Exit(1)
},
},
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: ml,
LeaseDuration: opts.LeaderElectionLeaseDuration,
RenewDeadline: opts.LeaderElectionRenewDeadline,
RetryPeriod: opts.LeaderElectionRetryPeriod,
ReleaseOnCancel: true,
Callbacks: callbacks,
})
if err != nil {
return err
}

le.Run(ctx)

return nil
}
11 changes: 8 additions & 3 deletions cmd/controller/app/start.go
Expand Up @@ -73,9 +73,14 @@ to renew certificates at an appropriate time before expiry.`,
}

logf.Log.V(logf.InfoLevel).Info("starting controller", "version", util.AppVersion, "git-commit", util.AppGitCommit)
o.RunCertManagerController(stopCh)
if err := o.RunCertManagerController(stopCh); err != nil {
cmd.SilenceUsage = true // Don't display usage information when exiting because of an error
return err
}

return nil
},
SilenceErrors: true, // Errors are already logged when calling cmd.Execute()
}

flags := cmd.Flags()
Expand All @@ -91,6 +96,6 @@ func (o CertManagerControllerOptions) Validate(args []string) error {
return utilerrors.NewAggregate(errors)
}

func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) {
Run(o.ControllerOptions, stopCh)
func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) error {
return Run(o.ControllerOptions, stopCh)
}

0 comments on commit d062176

Please sign in to comment.