diff --git a/injection/config.go b/injection/config.go new file mode 100644 index 0000000000..01a5962b7e --- /dev/null +++ b/injection/config.go @@ -0,0 +1,85 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package injection + +import ( + "errors" + "flag" + "log" + "os" + "os/user" + "path/filepath" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" +) + +// ParseAndGetRESTConfigOrDie parses the rest config flags and creates a client or +// dies by calling log.Fatalf. +func ParseAndGetRESTConfigOrDie() *rest.Config { + var ( + serverURL = flag.String("server", "", + "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + kubeconfig = flag.String("kubeconfig", "", + "Path to a kubeconfig. Only required if out-of-cluster.") + ) + klog.InitFlags(flag.CommandLine) + flag.Parse() + + cfg, err := GetRESTConfig(*serverURL, *kubeconfig) + if err != nil { + log.Fatal("Error building kubeconfig: ", err) + } + + return cfg +} + +// GetRESTConfig returns a rest.Config to be used for kubernetes client creation. +// It does so in the following order: +// 1. Use the passed kubeconfig/serverURL. +// 2. Fallback to the KUBECONFIG environment variable. +// 3. Fallback to in-cluster config. +// 4. Fallback to the ~/.kube/config. +func GetRESTConfig(serverURL, kubeconfig string) (*rest.Config, error) { + if kubeconfig == "" { + kubeconfig = os.Getenv("KUBECONFIG") + } + + // If we have an explicit indication of where the kubernetes config lives, read that. + if kubeconfig != "" { + c, err := clientcmd.BuildConfigFromFlags(serverURL, kubeconfig) + if err != nil { + return nil, err + } + return c, nil + } + + // If not, try the in-cluster config. + if c, err := rest.InClusterConfig(); err == nil { + return c, nil + } + + // If no in-cluster config, try the default location in the user's home directory. + if usr, err := user.Current(); err == nil { + if c, err := clientcmd.BuildConfigFromFlags("", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil { + return c, nil + } + } + + return nil, errors.New("could not create a valid kubeconfig") +} diff --git a/injection/injection.go b/injection/injection.go new file mode 100644 index 0000000000..d8cefc9fbe --- /dev/null +++ b/injection/injection.go @@ -0,0 +1,66 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package injection + +import ( + "context" + + "go.uber.org/zap" + "k8s.io/client-go/rest" + + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/signals" +) + +// EnableInjectionOrDie enables Knative Client Injection, and provides a +// callback to start the informers. Both Context and Config are optional. +// Returns context with rest config set and a callback to start the informers +// after watches have been set. +// +// Typical integration: +// ```go +// ctx, startInformers := injection.EnableInjectionOrDie(signals.NewContext(), nil) +// ... start watches with informers, if required ... +// startInformers() +// ``` +func EnableInjectionOrDie(ctx context.Context, cfg *rest.Config) (context.Context, func()) { + if ctx == nil { + ctx = signals.NewContext() + } + if cfg == nil { + cfg = ParseAndGetRESTConfigOrDie() + } + + // Respect user provided settings, but if omitted customize the default behavior. + if cfg.QPS == 0 { + cfg.QPS = rest.DefaultQPS + } + if cfg.Burst == 0 { + cfg.Burst = rest.DefaultBurst + } + ctx = WithConfig(ctx, cfg) + + ctx, informers := Default.SetupInformers(ctx, cfg) + + return ctx, func() { + logging.FromContext(ctx).Info("Starting informers...") + if err := controller.StartInformers(ctx.Done(), informers...); err != nil { + logging.FromContext(ctx).Fatalw("Failed to start informers", zap.Error(err)) + } + } +} diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 33d45391e2..c9dd034de2 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -18,16 +18,15 @@ package sharedmain import ( "context" - "errors" "flag" "log" "net/http" "os" - "os/user" - "path/filepath" "time" "go.opencensus.io/stats/view" + _ "go.uber.org/automaxprocs" // automatically set GOMAXPROCS based on cgroups + "go.uber.org/zap" "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -35,11 +34,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/klog" - - _ "go.uber.org/automaxprocs" // automatically set GOMAXPROCS based on cgroups - "go.uber.org/zap" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" @@ -63,37 +57,9 @@ import ( // 2. Fallback to the KUBECONFIG environment variable. // 3. Fallback to in-cluster config. // 4. Fallback to the ~/.kube/config. +// Deprecated: use injection.GetRESTConfig func GetConfig(serverURL, kubeconfig string) (*rest.Config, error) { - if kubeconfig == "" { - kubeconfig = os.Getenv("KUBECONFIG") - } - - // We produce configs a bunch of ways, this gives us a single place - // to "decorate" them with common useful things (e.g. for debugging) - decorate := func(cfg *rest.Config) *rest.Config { - return cfg - } - - // If we have an explicit indication of where the kubernetes config lives, read that. - if kubeconfig != "" { - c, err := clientcmd.BuildConfigFromFlags(serverURL, kubeconfig) - if err != nil { - return nil, err - } - return decorate(c), nil - } - // If not, try the in-cluster config. - if c, err := rest.InClusterConfig(); err == nil { - return decorate(c), nil - } - // If no in-cluster config, try the default location in the user's home directory. - if usr, err := user.Current(); err == nil { - if c, err := clientcmd.BuildConfigFromFlags("", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil { - return decorate(c), nil - } - } - - return nil, errors.New("could not create a valid kubeconfig") + return injection.GetRESTConfig(serverURL, kubeconfig) } // GetLoggingConfig gets the logging config from either the file system if present @@ -129,40 +95,16 @@ func GetLeaderElectionConfig(ctx context.Context) (*leaderelection.Config, error // EnableInjectionOrDie enables Knative Injection and starts the informers. // Both Context and Config are optional. +// Deprecated: use injection.EnableInjectionOrDie func EnableInjectionOrDie(ctx context.Context, cfg *rest.Config) context.Context { - if ctx == nil { - ctx = signals.NewContext() - } - if cfg == nil { - cfg = ParseAndGetConfigOrDie() - } - - // Respect user provided settings, but if omitted customize the default behavior. - if cfg.QPS == 0 { - cfg.QPS = rest.DefaultQPS - } - if cfg.Burst == 0 { - cfg.Burst = rest.DefaultBurst - } - ctx = injection.WithConfig(ctx, cfg) - - ctx, informers := injection.Default.SetupInformers(ctx, cfg) - - // Start the injection clients and informers. - logging.FromContext(ctx).Info("Starting informers...") - go func(ctx context.Context) { - if err := controller.StartInformers(ctx.Done(), informers...); err != nil { - logging.FromContext(ctx).Fatalw("Failed to start informers", zap.Error(err)) - } - <-ctx.Done() - }(ctx) - + ctx, startInformers := injection.EnableInjectionOrDie(ctx, cfg) + go startInformers() return ctx } // Main runs the generic main flow with a new context. -// If any of the contructed controllers are AdmissionControllers or Conversion webhooks, -// then a webhook is started to serve them. +// If any of the constructed controllers are AdmissionControllers or Conversion +// webhooks, then a webhook is started to serve them. func Main(component string, ctors ...injection.ControllerConstructor) { // Set up signals so we handle the first shutdown signal gracefully. MainWithContext(signals.NewContext(), component, ctors...) @@ -185,7 +127,7 @@ func MainWithContext(ctx context.Context, component string, ctors ...injection.C "issue upstream!") // HACK: This parses flags, so the above should be set once this runs. - cfg := ParseAndGetConfigOrDie() + cfg := injection.ParseAndGetRESTConfigOrDie() if *disableHighAvailability { ctx = WithHADisabled(ctx) @@ -225,16 +167,7 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto cfg.Burst = len(ctors) * rest.DefaultBurst } - // Respect user provided settings, but if omitted customize the default behavior. - if cfg.QPS == 0 { - cfg.QPS = rest.DefaultQPS - } - if cfg.Burst == 0 { - cfg.Burst = rest.DefaultBurst - } - ctx = injection.WithConfig(ctx, cfg) - - ctx, informers := injection.Default.SetupInformers(ctx, cfg) + ctx, startInformers := injection.EnableInjectionOrDie(ctx, cfg) logger, atomicLevel := SetupLoggerOrDie(ctx, component) defer flush(logger) @@ -287,11 +220,10 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto return wh.Run(ctx.Done()) }) } + // Start the injection clients and informers. - logging.FromContext(ctx).Info("Starting informers...") - if err := controller.StartInformers(ctx.Done(), informers...); err != nil { - logging.FromContext(ctx).Fatalw("Failed to start informers", zap.Error(err)) - } + startInformers() + // Wait for webhook informers to sync. if wh != nil { wh.InformersHaveSynced() @@ -317,22 +249,9 @@ func flush(logger *zap.SugaredLogger) { // ParseAndGetConfigOrDie parses the rest config flags and creates a client or // dies by calling log.Fatalf. +// Deprecated: use injeciton.ParseAndGetRESTConfigOrDie func ParseAndGetConfigOrDie() *rest.Config { - var ( - serverURL = flag.String("server", "", - "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") - kubeconfig = flag.String("kubeconfig", "", - "Path to a kubeconfig. Only required if out-of-cluster.") - ) - klog.InitFlags(flag.CommandLine) - flag.Parse() - - cfg, err := GetConfig(*serverURL, *kubeconfig) - if err != nil { - log.Fatal("Error building kubeconfig: ", err) - } - - return cfg + return injection.ParseAndGetRESTConfigOrDie() } // MemStatsOrDie sets up reporting on Go memory usage every 30 seconds or dies diff --git a/leaderelection/chaosduck/main.go b/leaderelection/chaosduck/main.go index d4950b911b..2ad685fe31 100644 --- a/leaderelection/chaosduck/main.go +++ b/leaderelection/chaosduck/main.go @@ -27,13 +27,14 @@ import ( "strings" "time" + "knative.dev/pkg/injection" + "golang.org/x/sync/errgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/kflag" "knative.dev/pkg/signals" "knative.dev/pkg/system" @@ -116,8 +117,7 @@ func quack(ctx context.Context, kc kubernetes.Interface, component string, leade } func main() { - ctx := signals.NewContext() - ctx = sharedmain.EnableInjectionOrDie(ctx, nil) + ctx, _ := injection.EnableInjectionOrDie(signals.NewContext(), nil) kc := kubeclient.Get(ctx)