This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathcontroller.go
454 lines (417 loc) · 19.1 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
package clusterresource
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime/debug"
"strings"
"time"
v1 "k8s.io/api/rbac/v1"
"github.com/lyft/flyteadmin/pkg/manager/impl/resources"
managerinterfaces "github.com/lyft/flyteadmin/pkg/manager/interfaces"
"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"
"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/kubernetes/scheme"
"github.com/lyft/flytestdlib/logger"
"google.golang.org/grpc/codes"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/repositories"
"github.com/lyft/flyteadmin/pkg/runtime"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flytestdlib/promutils"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
)
const namespaceVariable = "namespace"
const templateVariableFormat = "{{ %s }}"
const replaceAllInstancesOfString = -1
// The clusterresource Controller manages applying desired templatized kubernetes resource files as resources
// in the execution kubernetes cluster.
type Controller interface {
Sync(ctx context.Context) error
Run()
}
type controllerMetrics struct {
Scope promutils.Scope
SyncStarted prometheus.Counter
KubernetesResourcesCreated prometheus.Counter
KubernetesResourcesCreateErrors prometheus.Counter
ResourcesAdded prometheus.Counter
ResourceAddErrors prometheus.Counter
TemplateReadErrors prometheus.Counter
TemplateDecodeErrors prometheus.Counter
AppliedTemplateExists prometheus.Counter
TemplateUpdateErrors prometheus.Counter
Panics prometheus.Counter
}
type FileName = string
type NamespaceName = string
type LastModTimeCache = map[FileName]time.Time
type NamespaceCache = map[NamespaceName]LastModTimeCache
type templateValuesType = map[string]string
type controller struct {
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
executionCluster interfaces.ClusterInterface
resourceManager managerinterfaces.ResourceInterface
poller chan struct{}
metrics controllerMetrics
lastAppliedTemplateDir string
// Map of [namespace -> [templateFileName -> last modified time]]
appliedTemplates NamespaceCache
}
var descCreatedAtSortParam, _ = common.NewSortParameter(admin.Sort{
Direction: admin.Sort_DESCENDING,
Key: "created_at",
})
// Use a strategic-merge-patch to mimic `kubectl apply` behavior for serviceaccounts.
// Kubectl defaults to using the StrategicMergePatch strategy.
// However the controller-runtime only has an implementation for MergePatch which we were formerly
// using but failed to actually always merge resources in the Patch call.
// INTERESTINGLY Patch doesn't actually appear to update the majority of resources. We default to using Update but
// whitelist the specific set of resources that require a Patch to work instead.
// If you use update with a ServiceAccount - *every* call to Update results in a new corresponding secret being created
// which has the (not so) fun side-effect of overwhelming API server when this Sync script is run as a cron.
var strategicPatchTypes = map[string]bool{
v1.ServiceAccountKind: true,
}
func (c *controller) templateAlreadyApplied(namespace NamespaceName, templateFile os.FileInfo) bool {
namespacedAppliedTemplates, ok := c.appliedTemplates[namespace]
if !ok {
// There is no record of this namespace altogether.
return false
}
timestamp, ok := namespacedAppliedTemplates[templateFile.Name()]
if !ok {
// There is no record of this file having ever been applied.
return false
}
// The applied template file could have been modified, in which case we will need to apply it once more.
return timestamp.Equal(templateFile.ModTime())
}
// Given a map of templatized variable names -> data source, this function produces an output that maps the same
// variable names to their fully resolved values (from the specified data source).
func populateTemplateValues(data map[string]runtimeInterfaces.DataSource) (templateValuesType, error) {
templateValues := make(templateValuesType, len(data))
collectedErrs := make([]error, 0)
for templateVar, dataSource := range data {
if templateVar == namespaceVariable {
// The namespace variable is specifically reserved for system use only.
collectedErrs = append(collectedErrs, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"Cannot assign namespace template value in user data"))
continue
}
var dataValue string
if len(dataSource.Value) > 0 {
dataValue = dataSource.Value
} else if len(dataSource.ValueFrom.EnvVar) > 0 {
dataValue = os.Getenv(dataSource.ValueFrom.EnvVar)
} else if len(dataSource.ValueFrom.FilePath) > 0 {
templateFile, err := ioutil.ReadFile(dataSource.ValueFrom.FilePath)
if err != nil {
collectedErrs = append(collectedErrs, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"failed to substitute parameterized value for %s: unable to read value from: [%+v] with err: %v",
templateVar, dataSource.ValueFrom.FilePath, err))
continue
}
dataValue = string(templateFile)
} else {
collectedErrs = append(collectedErrs, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"failed to substitute parameterized value for %s: unset or unrecognized ValueFrom: [%+v]", templateVar, dataSource.ValueFrom))
continue
}
if len(dataValue) == 0 {
collectedErrs = append(collectedErrs, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"failed to substitute parameterized value for %s: unset. ValueFrom: [%+v]", templateVar, dataSource.ValueFrom))
continue
}
templateValues[fmt.Sprintf(templateVariableFormat, templateVar)] = dataValue
}
if len(collectedErrs) > 0 {
return nil, errors.NewCollectedFlyteAdminError(codes.InvalidArgument, collectedErrs)
}
return templateValues, nil
}
// Produces a map of template variable names and their fully resolved values based on configured defaults for each
// system-domain in the application config file.
func populateDefaultTemplateValues(defaultData map[runtimeInterfaces.DomainName]runtimeInterfaces.TemplateData) (
map[string]templateValuesType, error) {
defaultTemplateValues := make(map[string]templateValuesType)
collectedErrs := make([]error, 0)
for domainName, templateData := range defaultData {
domainSpecificTemplateValues, err := populateTemplateValues(templateData)
if err != nil {
collectedErrs = append(collectedErrs, err)
continue
}
defaultTemplateValues[domainName] = domainSpecificTemplateValues
}
if len(collectedErrs) > 0 {
return nil, errors.NewCollectedFlyteAdminError(codes.InvalidArgument, collectedErrs)
}
return defaultTemplateValues, nil
}
// Fetches user-specified overrides from the admin database for template variables and their desired value
// substitutions based on the input project and domain. These database values are overlaid on top of the configured
// variable defaults for the specific domain as defined in the admin application config file.
func (c *controller) getCustomTemplateValues(
ctx context.Context, project, domain string, domainTemplateValues templateValuesType) (templateValuesType, error) {
if len(domainTemplateValues) == 0 {
domainTemplateValues = make(templateValuesType)
}
customTemplateValues := make(templateValuesType)
for key, value := range domainTemplateValues {
customTemplateValues[key] = value
}
collectedErrs := make([]error, 0)
// All override values saved in the database take precedence over the domain-specific defaults.
resource, err := c.resourceManager.GetResource(ctx, managerinterfaces.ResourceRequest{
Project: project,
Domain: domain,
ResourceType: admin.MatchableResource_CLUSTER_RESOURCE,
})
if err != nil {
if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound {
collectedErrs = append(collectedErrs, err)
}
}
if resource != nil && resource.Attributes != nil && resource.Attributes.GetClusterResourceAttributes() != nil {
for templateKey, templateValue := range resource.Attributes.GetClusterResourceAttributes().Attributes {
customTemplateValues[fmt.Sprintf(templateVariableFormat, templateKey)] = templateValue
}
}
if len(collectedErrs) > 0 {
return nil, errors.NewCollectedFlyteAdminError(codes.InvalidArgument, collectedErrs)
}
return customTemplateValues, nil
}
// This function loops through the kubernetes resource template files in the configured template directory.
// For each unapplied template file (wrt the namespace) this func attempts to
// 1) read the template file
// 2) substitute templatized variables with their resolved values
// 3) decode the output of the above into a kubernetes resource
// 4) create the resource on the kubernetes cluster and cache successful outcomes
func (c *controller) syncNamespace(ctx context.Context, namespace NamespaceName,
templateValues, customTemplateValues templateValuesType) error {
templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath()
if c.lastAppliedTemplateDir != templateDir {
// Invalidate all caches
c.lastAppliedTemplateDir = templateDir
c.appliedTemplates = make(NamespaceCache)
}
templateFiles, err := ioutil.ReadDir(templateDir)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to read config template dir [%s] for namespace [%s] with err: %v",
namespace, templateDir, err)
}
collectedErrs := make([]error, 0)
for _, templateFile := range templateFiles {
templateFileName := templateFile.Name()
if filepath.Ext(templateFileName) != ".yaml" {
// nothing to do.
logger.Debugf(ctx, "syncing namespace [%s]: ignoring unrecognized filetype [%s]",
namespace, templateFile.Name())
continue
}
if c.templateAlreadyApplied(namespace, templateFile) {
// nothing to do.
logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name())
continue
}
// 1) read the template file
template, err := ioutil.ReadFile(path.Join(templateDir, templateFileName))
if err != nil {
logger.Warningf(ctx,
"failed to read config template from path [%s] for namespace [%s] with err: %v",
templateFileName, namespace, err)
err := errors.NewFlyteAdminErrorf(
codes.Internal, "failed to read config template from path [%s] for namespace [%s] with err: %v",
templateFileName, namespace, err)
collectedErrs = append(collectedErrs, err)
c.metrics.TemplateReadErrors.Inc()
continue
}
logger.Debugf(ctx, "successfully read template config file [%s]", templateFileName)
// 2) substitute templatized variables with their resolved values
// First, add the special case namespace template which is always substituted by the system
// rather than fetched via a user-specified source.
templateValues[fmt.Sprintf(templateVariableFormat, namespaceVariable)] = namespace
var config = string(template)
for templateKey, templateValue := range templateValues {
config = strings.Replace(config, templateKey, templateValue, replaceAllInstancesOfString)
}
// Replace remaining template variables from domain specific defaults.
for templateKey, templateValue := range customTemplateValues {
config = strings.Replace(config, templateKey, templateValue, replaceAllInstancesOfString)
}
// 3) decode the kubernetes resource template file into an actual resource object
decode := scheme.Codecs.UniversalDeserializer().Decode
k8sObj, _, err := decode([]byte(config), nil, nil)
if err != nil {
logger.Warningf(ctx, "Failed to decode config template [%s] for namespace [%s] into a kubernetes object with err: %v",
templateFileName, namespace, err)
err := errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"Failed to decode namespace config template [%s] for namespace [%s] into a kubernetes object with err: %v",
templateFileName, namespace, err)
collectedErrs = append(collectedErrs, err)
c.metrics.TemplateDecodeErrors.Inc()
continue
}
// 4) create the resource on the kubernetes cluster and cache successful outcomes
if _, ok := c.appliedTemplates[namespace]; !ok {
c.appliedTemplates[namespace] = make(LastModTimeCache)
}
for _, target := range c.executionCluster.GetAllValidTargets() {
k8sObjCopy := k8sObj.DeepCopyObject()
logger.Debugf(ctx, "Attempting to create resource [%+v] in cluster [%v] for namespace [%s]",
k8sObj.GetObjectKind().GroupVersionKind().Kind, target.ID, namespace)
err = target.Client.Create(ctx, k8sObjCopy)
if err != nil {
if k8serrors.IsAlreadyExists(err) {
logger.Debugf(ctx, "Type [%+v] in namespace [%s] already exists - attempting update instead",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
c.metrics.AppliedTemplateExists.Inc()
if ok := strategicPatchTypes[k8sObjCopy.GetObjectKind().GroupVersionKind().Kind]; ok {
err = target.Client.Patch(ctx, k8sObjCopy, StrategicMergeFrom(k8sObjCopy))
} else {
err = target.Client.Update(ctx, k8sObjCopy)
}
if err != nil {
c.metrics.TemplateUpdateErrors.Inc()
logger.Warningf(ctx, "Failed to update resource [%+v] in namespace [%s] with err :%v",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace, err)
collectedErrs = append(collectedErrs, err)
} else {
logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
}
c.appliedTemplates[namespace][templateFile.Name()] = templateFile.ModTime()
} else {
c.metrics.KubernetesResourcesCreateErrors.Inc()
logger.Warningf(ctx, "Failed to create kubernetes object from config template [%s] for namespace [%s] with err: %v",
templateFileName, namespace, err)
err := errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to create kubernetes object from config template [%s] for namespace [%s] with err: %v",
templateFileName, namespace, err)
collectedErrs = append(collectedErrs, err)
}
} else {
logger.Debugf(ctx, "Created resource [%+v] for namespace [%s] in kubernetes",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
c.metrics.KubernetesResourcesCreated.Inc()
c.appliedTemplates[namespace][templateFile.Name()] = templateFile.ModTime()
}
}
}
if len(collectedErrs) > 0 {
return errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs)
}
return nil
}
func (c *controller) Sync(ctx context.Context) error {
defer func() {
if err := recover(); err != nil {
c.metrics.Panics.Inc()
logger.Warningf(ctx, fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack())))
}
}()
c.metrics.SyncStarted.Inc()
logger.Debugf(ctx, "Running an invocation of ClusterResource Sync")
// Prefer to sync projects most newly created to ensure their resources get created first when other resources exist.
projects, err := c.db.ProjectRepo().ListAll(ctx, descCreatedAtSortParam)
if err != nil {
return err
}
domains := c.config.ApplicationConfiguration().GetDomainsConfig()
var errs = make([]error, 0)
templateValues, err := populateTemplateValues(c.config.ClusterResourceConfiguration().GetTemplateData())
if err != nil {
logger.Warningf(ctx, "Failed to get templatized values specified in config: %v", err)
errs = append(errs, err)
}
domainTemplateValues, err := populateDefaultTemplateValues(c.config.ClusterResourceConfiguration().GetCustomTemplateData())
if err != nil {
logger.Warningf(ctx, "Failed to get domain-specific templatized values specified in config: %v", err)
errs = append(errs, err)
}
for _, project := range projects {
for _, domain := range *domains {
namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceMappingConfig(), project.Identifier, domain.Name)
customTemplateValues, err := c.getCustomTemplateValues(
ctx, project.Identifier, domain.ID, domainTemplateValues[domain.ID])
if err != nil {
logger.Warningf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err)
errs = append(errs, err)
}
err = c.syncNamespace(ctx, namespace, templateValues, customTemplateValues)
if err != nil {
logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err)
c.metrics.ResourceAddErrors.Inc()
errs = append(errs, err)
} else {
c.metrics.ResourcesAdded.Inc()
logger.Debugf(ctx, "Successfully created kubernetes resources for [%s]", namespace)
}
}
}
if len(errs) > 0 {
return errors.NewCollectedFlyteAdminError(codes.Internal, errs)
}
return nil
}
func (c *controller) Run() {
ctx := context.Background()
logger.Debugf(ctx, "Running ClusterResourceController")
interval := c.config.ClusterResourceConfiguration().GetRefreshInterval()
wait.Forever(func() {
err := c.Sync(ctx)
if err != nil {
logger.Warningf(ctx, "Failed cluster resource creation loop with: %v", err)
}
}, interval)
}
func newMetrics(scope promutils.Scope) controllerMetrics {
return controllerMetrics{
Scope: scope,
SyncStarted: scope.MustNewCounter("k8s_resource_syncs",
"overall count of the number of invocations of the resource controller 'sync' method"),
KubernetesResourcesCreated: scope.MustNewCounter("k8s_resources_created",
"overall count of successfully created resources in kubernetes"),
KubernetesResourcesCreateErrors: scope.MustNewCounter("k8s_resource_create_errors",
"overall count of errors encountered attempting to create resources in kubernetes"),
ResourcesAdded: scope.MustNewCounter("resources_added",
"overall count of successfully added resources for namespaces"),
ResourceAddErrors: scope.MustNewCounter("resource_add_errors",
"overall count of errors encountered creating resources for namespaces"),
TemplateReadErrors: scope.MustNewCounter("template_read_errors",
"errors encountered reading the yaml template file from the local filesystem"),
TemplateDecodeErrors: scope.MustNewCounter("template_decode_errors",
"errors encountered trying to decode yaml template into k8s go struct"),
AppliedTemplateExists: scope.MustNewCounter("applied_template_exists",
"Number of times the system to tried to apply an uncached resource the kubernetes reported as "+
"already existing"),
TemplateUpdateErrors: scope.MustNewCounter("template_update_errors",
"Number of times an attempt at updating an already existing kubernetes resource with a template"+
"file failed"),
Panics: scope.MustNewCounter("panics",
"overall count of panics encountered in primary ClusterResourceController loop"),
}
}
func NewClusterResourceController(db repositories.RepositoryInterface, executionCluster interfaces.ClusterInterface, scope promutils.Scope) Controller {
config := runtime.NewConfigurationProvider()
return &controller{
db: db,
config: config,
executionCluster: executionCluster,
resourceManager: resources.NewResourceManager(db, config.ApplicationConfiguration()),
poller: make(chan struct{}),
metrics: newMetrics(scope),
appliedTemplates: make(map[string]map[string]time.Time),
}
}