-
Notifications
You must be signed in to change notification settings - Fork 8
/
bootstrap.go
277 lines (239 loc) · 8.09 KB
/
bootstrap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package bootstrap
import (
"context"
"net/http"
"github.com/rotisserie/eris"
"github.com/solo-io/go-utils/errgroup"
"github.com/solo-io/skv2/pkg/stats"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/go-logr/zapr"
"github.com/solo-io/go-utils/contextutils"
v1 "github.com/solo-io/skv2/pkg/api/core.skv2.solo.io/v1"
"github.com/solo-io/skv2/pkg/multicluster"
"github.com/solo-io/skv2/pkg/multicluster/watch"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log"
zaputil "sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
// required import to enable kube client-go auth plugins
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
// StartParameters specify paramters for starting a generic controller which may need access to its local cluster as well as remote (multicluster) clients and managers
type StartParameters struct {
MasterManager manager.Manager
McClient multicluster.Client // nil if running in agent mode
Clusters multicluster.Interface // nil if running in agent mode
SnapshotHistory stats.SnapshotHistory
// Reference to Settings object this controller uses.
SettingsRef v1.ObjectRef
// enable additional logging
VerboseMode bool
}
// the start function that will be called with the initialized parameters
type StartFunc func(
ctx context.Context,
parameters StartParameters,
) error
// bootstrap options for starting discovery
type Options struct {
// MetricsBindPort is the TCP port that the controller should bind to
// for serving prometheus metrics.
// It can be set to 0 to disable the metrics serving.
MetricsBindPort uint32
// MasterNamespace if specified restricts the Master manager's cache to watch objects in the desired namespace.
// Defaults to all namespaces.
//
// Note: If a namespace is specified, controllers can still Watch for a cluster-scoped resource (e.g Node). For namespaced resources the cache will only hold objects from the desired namespace.
MasterNamespace string
// enables verbose mode
VerboseMode bool
// enables dev logger (instead of prod logger)
// NOTE: DO NOT set this to true in Prod, it will crash on DPanic
DevLogger bool
// ManagementContext if specified read the KubeConfig for the management cluster from this context. Only applies when running out of cluster.
ManagementContext string
// Reference to the Settings object that the controller should use.
SettingsRef v1.ObjectRef
}
// convenience function for setting these options via spf13 flags
func (opts *Options) AddToFlags(flags *pflag.FlagSet) {
flags.StringVarP(
&opts.MasterNamespace,
"namespace",
"n",
metav1.NamespaceAll,
"if specified restricts the master manager's cache to watch objects in the desired namespace.",
)
flags.Uint32Var(
&opts.MetricsBindPort,
"metrics-port",
opts.MetricsBindPort,
"port on which to serve Prometheus metrics. set to 0 to disable",
)
flags.BoolVar(&opts.VerboseMode, "verbose", true, "enables verbose/debug logging")
flags.StringVar(
&opts.ManagementContext,
"context",
"",
"If specified, use this context from the selected KubeConfig to connect to the local (management) cluster.",
)
flags.StringVar(
&opts.SettingsRef.Name,
"settings-name",
opts.SettingsRef.Name,
"The name of the Settings object this controller should use.",
)
flags.StringVar(
&opts.SettingsRef.Namespace,
"settings-namespace",
opts.SettingsRef.Namespace,
"The namespace of the Settings object this controller should use.",
)
// This flag disables prod mode when set to false, in other words setting debug to true,
// Which will cause the app to panic on DPanic.
flags.BoolVar(
&opts.DevLogger,
"dev-logger",
false,
"Default: false. Set this value to true to enable debug panic logs for development.",
)
flags.MarkHidden("dev-logger")
}
// Start a controller with the given start func. The StartFunc will be called with a bootstrapped local manager. If localMode is false, the StartParameters will include initialized multicluster components.
func Start(ctx context.Context, start StartFunc, opts Options, schemes runtime.SchemeBuilder, localMode bool) error {
return StartMulti(ctx, map[string]StartFunc{"": start}, opts, schemes, localMode)
}
// Like Start, but runs multiple StartFuncs concurrently
func StartMulti(
ctx context.Context,
startFuncs map[string]StartFunc,
opts Options,
schemes runtime.SchemeBuilder,
localMode bool,
addStatsHandlers ...func(mux *http.ServeMux, profiles map[string]string),
) error {
setupLogging(opts.VerboseMode, opts.DevLogger)
mgr, err := makeMasterManager(opts, schemes)
if err != nil {
return err
}
snapshotHistory := stats.NewSnapshotHistory()
stats.MustStartServerBackground(snapshotHistory, opts.MetricsBindPort, addStatsHandlers...)
var (
clusterWatcher multicluster.Interface
mcClient multicluster.Client
)
if !localMode {
// construct multicluster watcher and client
clusterWatcher = watch.NewClusterWatcher(
ctx, manager.Options{
Namespace: "", // TODO (ilackarms): support configuring specific watch namespaces on remote clusters
Scheme: mgr.GetScheme(),
},
)
mcClient = multicluster.NewClient(clusterWatcher)
}
params := StartParameters{
MasterManager: mgr,
McClient: mcClient,
Clusters: clusterWatcher,
SnapshotHistory: snapshotHistory,
VerboseMode: opts.VerboseMode,
SettingsRef: opts.SettingsRef,
}
eg, ctx := errgroup.WithContext(ctx)
for name, start := range startFuncs {
start := start // pike
namedCtx := ctx
if name != "" {
namedCtx = contextutils.WithLogger(namedCtx, name)
}
eg.Go(
func() error {
contextutils.LoggerFrom(namedCtx).Debugf("starting main goroutine")
if synced := params.MasterManager.GetCache().WaitForCacheSync(namedCtx); !synced {
return eris.Errorf("caches failed to sync")
}
err := start(namedCtx, params)
if err != nil {
contextutils.LoggerFrom(namedCtx).Errorf("main goroutine failed: %v", err)
}
return err
},
)
}
if clusterWatcher != nil {
// start multicluster watches
eg.Go(
func() error {
if synced := params.MasterManager.GetCache().WaitForCacheSync(ctx); !synced {
return eris.Errorf("caches failed to sync")
}
return clusterWatcher.Run(mgr)
},
)
}
eg.Go(
func() error {
// start the local manager
ctx := contextutils.WithLogger(ctx, "controller-runtime-manager")
contextutils.LoggerFrom(ctx).Infof("starting manager with options %+v", opts)
return mgr.Start(ctx)
},
)
return eg.Wait()
}
// get the manager for the local cluster; we will use this as our "master" cluster
func makeMasterManager(opts Options, schemes runtime.SchemeBuilder) (manager.Manager, error) {
cfg, err := config.GetConfigWithContext(opts.ManagementContext)
if err != nil {
return nil, err
}
mgr, err := manager.New(
cfg, manager.Options{
Namespace: opts.MasterNamespace, // TODO (ilackarms): support configuring multiple watch namespaces on master cluster
MetricsBindAddress: "0", // serve metrics using custom stats server
},
)
if err != nil {
return nil, err
}
if schemes != nil {
if err := schemes.AddToScheme(mgr.GetScheme()); err != nil {
return nil, err
}
}
return mgr, nil
}
func setupLogging(verboseMode, devLogging bool) {
level := zapcore.InfoLevel
if verboseMode {
level = zapcore.DebugLevel
}
atomicLevel := zap.NewAtomicLevelAt(level)
zapOpts := []zaputil.Opts{
zaputil.Level(&atomicLevel),
}
if devLogging {
zapOpts = append(
zapOpts,
// Only set debug mode if specified. This will use a non-json (human readable) encoder which makes it impossible
// to use any json parsing tools for the log. Should only be enabled explicitly
zaputil.UseDevMode(true),
)
}
baseLogger := zaputil.NewRaw(zapOpts...)
// klog
zap.ReplaceGlobals(baseLogger)
// controller-runtime
zapLogger := zapr.NewLogger(baseLogger)
log.SetLogger(zapLogger)
klog.SetLogger(zapLogger)
// go-utils
contextutils.SetFallbackLogger(baseLogger.Sugar())
}