Skip to content

Commit

Permalink
move enable injeciton out of sharedmain
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Nichols committed Oct 6, 2020
1 parent 11f5e2d commit 7dfb20f
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 45 deletions.
88 changes: 88 additions & 0 deletions injection/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package injection

import (
"errors"
"flag"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
"log"
"os"
"os/user"
"path/filepath"
)

// ParseAndGetRestConfigOrDie parses the rest config flags and creates a client or
// dies by calling log.Fatalf.
func ParseAndGetRestConfigOrDie() *rest.Config {
var (
serverURL = flag.String("server", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "",
"Path to a kubeconfig. Only required if out-of-cluster.")
)
klog.InitFlags(flag.CommandLine)
flag.Parse()

cfg, err := GetRestConfig(*serverURL, *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}

return cfg
}

// GetConfig returns a rest.Config to be used for kubernetes client creation.
// It does so in the following order:
// 1. Use the passed kubeconfig/serverURL.
// 2. Fallback to the KUBECONFIG environment variable.
// 3. Fallback to in-cluster config.
// 4. Fallback to the ~/.kube/config.
func GetRestConfig(serverURL, kubeconfig string) (*rest.Config, error) {
if kubeconfig == "" {
kubeconfig = os.Getenv("KUBECONFIG")
}

// We produce configs a bunch of ways, this gives us a single place
// to "decorate" them with common useful things (e.g. for debugging)
decorate := func(cfg *rest.Config) *rest.Config {
return cfg
}

// If we have an explicit indication of where the kubernetes config lives, read that.
if kubeconfig != "" {
c, err := clientcmd.BuildConfigFromFlags(serverURL, kubeconfig)
if err != nil {
return nil, err
}
return decorate(c), nil
}
// If not, try the in-cluster config.
if c, err := rest.InClusterConfig(); err == nil {
return decorate(c), nil
}
// If no in-cluster config, try the default location in the user's home directory.
if usr, err := user.Current(); err == nil {
if c, err := clientcmd.BuildConfigFromFlags("", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil {
return decorate(c), nil
}
}

return nil, errors.New("could not create a valid kubeconfig")
}
1 change: 0 additions & 1 deletion injection/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package injection

import (
"context"

"k8s.io/client-go/rest"
)

Expand Down
57 changes: 57 additions & 0 deletions injection/injection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package injection

import (
"context"
"go.uber.org/zap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"

"k8s.io/client-go/rest"
)

// EnableInjectionOrDie enables Knative Injection and starts the informers.
// Both Context and Config are optional. Returns context with rest config set
// and a function to start the informers after watches have been set up.
func EnableInjectionOrDie(ctx context.Context, cfg *rest.Config) (context.Context, func()) {
if ctx == nil {
ctx = signals.NewContext()
}
if cfg == nil {
cfg = ParseAndGetRestConfigOrDie()
}

// Respect user provided settings, but if omitted customize the default behavior.
if cfg.QPS == 0 {
cfg.QPS = rest.DefaultQPS
}
if cfg.Burst == 0 {
cfg.Burst = rest.DefaultBurst
}
ctx = WithConfig(ctx, cfg)

ctx, informers := Default.SetupInformers(ctx, cfg)

return ctx, func() {
logging.FromContext(ctx).Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logging.FromContext(ctx).Fatalw("Failed to start informers", zap.Error(err))
}
}
}
51 changes: 10 additions & 41 deletions injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"os/user"
"path/filepath"
"reflect"
"time"

"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -64,6 +63,7 @@ import (
// 2. Fallback to the KUBECONFIG environment variable.
// 3. Fallback to in-cluster config.
// 4. Fallback to the ~/.kube/config.
// Deprecated: use injection.GetRestConfig
func GetConfig(serverURL, kubeconfig string) (*rest.Config, error) {
if kubeconfig == "" {
kubeconfig = os.Getenv("KUBECONFIG")
Expand Down Expand Up @@ -128,40 +128,15 @@ func GetLeaderElectionConfig(ctx context.Context) (*leaderelection.Config, error
return leaderelection.NewConfigFromConfigMap(leaderElectionConfigMap)
}

const (
defaultStartInformerDelayTime = 5 * time.Second
)

type informerStartChanKey struct{}
type informerStartTimeoutKey struct{}

func WithInformerStart(ctx context.Context, start <-chan struct{}, timeout time.Duration) context.Context {
ctx = context.WithValue(ctx, informerStartChanKey{}, start)
return context.WithValue(ctx, informerStartTimeoutKey{}, timeout)
}

func getInformerStartChanTimeout(ctx context.Context) (<-chan struct{}, time.Duration) {
timeout := defaultStartInformerDelayTime
tuntyped := ctx.Value(informerStartTimeoutKey{})
if tuntyped != nil {
timeout = tuntyped.(time.Duration)
}

untyped := ctx.Value(informerStartChanKey{})
if untyped == nil || reflect.ValueOf(untyped).IsNil() {
return make(chan struct{}, 1), timeout
}
return untyped.(<-chan struct{}), timeout
}

// EnableInjectionOrDie enables Knative Injection and starts the informers.
// Both Context and Config are optional.
// Deprecated: use injection.EnableInjectionOrDie
func EnableInjectionOrDie(ctx context.Context, cfg *rest.Config) context.Context {
if ctx == nil {
ctx = signals.NewContext()
}
if cfg == nil {
cfg = ParseAndGetConfigOrDie()
cfg = injection.ParseAndGetRestConfigOrDie()
}

// Respect user provided settings, but if omitted customize the default behavior.
Expand All @@ -175,13 +150,7 @@ func EnableInjectionOrDie(ctx context.Context, cfg *rest.Config) context.Context

ctx, informers := injection.Default.SetupInformers(ctx, cfg)

start, timeout := getInformerStartChanTimeout(ctx)
go func() {
// Block until the timeout or we are told it is ok to start informers.
select {
case <-start:
case <-time.After(timeout):
}
logging.FromContext(ctx).Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logging.FromContext(ctx).Fatalw("Failed to start informers", zap.Error(err))
Expand All @@ -192,8 +161,8 @@ func EnableInjectionOrDie(ctx context.Context, cfg *rest.Config) context.Context
}

// Main runs the generic main flow with a new context.
// If any of the contructed controllers are AdmissionControllers or Conversion webhooks,
// then a webhook is started to serve them.
// If any of the constructed controllers are AdmissionControllers or Conversion
// webhooks, then a webhook is started to serve them.
func Main(component string, ctors ...injection.ControllerConstructor) {
// Set up signals so we handle the first shutdown signal gracefully.
MainWithContext(signals.NewContext(), component, ctors...)
Expand All @@ -216,7 +185,7 @@ func MainWithContext(ctx context.Context, component string, ctors ...injection.C
"issue upstream!")

// HACK: This parses flags, so the above should be set once this runs.
cfg := ParseAndGetConfigOrDie()
cfg := injection.ParseAndGetRestConfigOrDie()

if *disableHighAvailability {
ctx = WithHADisabled(ctx)
Expand Down Expand Up @@ -256,8 +225,7 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
cfg.Burst = len(ctors) * rest.DefaultBurst
}

startCh := make(chan struct{}, 1)
ctx = EnableInjectionOrDie(WithInformerStart(ctx, startCh, defaultStartInformerDelayTime), cfg)
ctx, startInformers := injection.EnableInjectionOrDie(ctx, cfg)

logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
Expand Down Expand Up @@ -312,7 +280,7 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
}

// Start the injection clients and informers.
startCh <- struct{}{}
startInformers()

// Wait for webhook informers to sync.
if wh != nil {
Expand All @@ -339,6 +307,7 @@ func flush(logger *zap.SugaredLogger) {

// ParseAndGetConfigOrDie parses the rest config flags and creates a client or
// dies by calling log.Fatalf.
// Deprecated: use injeciton.ParseAndGetRestConfigOrDie
func ParseAndGetConfigOrDie() *rest.Config {
var (
serverURL = flag.String("server", "",
Expand All @@ -349,7 +318,7 @@ func ParseAndGetConfigOrDie() *rest.Config {
klog.InitFlags(flag.CommandLine)
flag.Parse()

cfg, err := GetConfig(*serverURL, *kubeconfig)
cfg, err := injection.GetRestConfig(*serverURL, *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions leaderelection/chaosduck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"flag"
"knative.dev/pkg/injection"
"log"
"strings"
"time"
Expand All @@ -33,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/kflag"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -116,8 +116,7 @@ func quack(ctx context.Context, kc kubernetes.Interface, component string, leade
}

func main() {
ctx := signals.NewContext()
ctx = sharedmain.EnableInjectionOrDie(ctx, nil)
ctx, _ := injection.EnableInjectionOrDie(signals.NewContext(), nil)

kc := kubeclient.Get(ctx)

Expand Down

0 comments on commit 7dfb20f

Please sign in to comment.