/
solver.go
290 lines (258 loc) · 9.22 KB
/
solver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// The solver package is responsible for constructing a
// taskqueue based on the set of resources that should be
// applied.
// This involves setting up the appropriate sequence of
// apply, wait and prune tasks so any dependencies between
// resources doesn't cause a later apply operation to
// fail.
// Currently this package assumes that the resources have
// already been sorted in the appropriate order. We might
// want to consider moving the sorting functionality into
// this package.
package solver
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/mutator"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/task"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/graph"
"sigs.k8s.io/cli-utils/pkg/object/validation"
)
type TaskQueueBuilder struct {
Pruner *prune.Pruner
DynamicClient dynamic.Interface
OpenAPIGetter discovery.OpenAPISchemaInterface
InfoHelper info.Helper
Mapper meta.RESTMapper
InvClient inventory.Client
// Collector is used to collect validation errors and invalid objects.
// Invalid objects will be filtered and not be injected into tasks.
Collector *validation.Collector
ApplyFilters []filter.ValidationFilter
ApplyMutators []mutator.Interface
PruneFilters []filter.ValidationFilter
// The accumulated tasks and counter variables to name tasks.
applyCounter int
pruneCounter int
waitCounter int
invInfo inventory.Info
applyObjs object.UnstructuredSet
pruneObjs object.UnstructuredSet
}
type TaskQueue struct {
tasks []taskrunner.Task
}
func (tq *TaskQueue) ToChannel() chan taskrunner.Task {
taskQueue := make(chan taskrunner.Task, len(tq.tasks))
for _, t := range tq.tasks {
taskQueue <- t
}
return taskQueue
}
func (tq *TaskQueue) ToActionGroups() []event.ActionGroup {
var ags []event.ActionGroup
for _, t := range tq.tasks {
ags = append(ags, event.ActionGroup{
Name: t.Name(),
Action: t.Action(),
Identifiers: t.Identifiers(),
})
}
return ags
}
type Options struct {
ServerSideOptions common.ServerSideOptions
ReconcileTimeout time.Duration
// True if we are destroying, which deletes the inventory object
// as well (possibly) the inventory namespace.
Destroy bool
// True if we're deleting prune objects
Prune bool
DryRunStrategy common.DryRunStrategy
PrunePropagationPolicy metav1.DeletionPropagation
PruneTimeout time.Duration
InventoryPolicy inventory.Policy
}
// WithInventory sets the inventory info and returns the builder for chaining.
func (t *TaskQueueBuilder) WithInventory(inv inventory.Info) *TaskQueueBuilder {
t.invInfo = inv
return t
}
// WithApplyObjects sets the apply objects and returns the builder for chaining.
func (t *TaskQueueBuilder) WithApplyObjects(applyObjs object.UnstructuredSet) *TaskQueueBuilder {
t.applyObjs = applyObjs
return t
}
// WithPruneObjects sets the prune objects and returns the builder for chaining.
func (t *TaskQueueBuilder) WithPruneObjects(pruneObjs object.UnstructuredSet) *TaskQueueBuilder {
t.pruneObjs = pruneObjs
return t
}
// Build returns the queue of tasks that have been created
func (t *TaskQueueBuilder) Build(taskContext *taskrunner.TaskContext, o Options) *TaskQueue {
var tasks []taskrunner.Task
// reset counters
t.applyCounter = 0
t.pruneCounter = 0
t.waitCounter = 0
// Filter objects that failed earlier validation
applyObjs := t.Collector.FilterInvalidObjects(t.applyObjs)
pruneObjs := t.Collector.FilterInvalidObjects(t.pruneObjs)
// Merge applyObjs & pruneObjs and graph them together.
// This detects implicit and explicit dependencies.
// Invalid dependency annotations will be treated as validation errors.
allObjs := make(object.UnstructuredSet, 0, len(applyObjs)+len(pruneObjs))
allObjs = append(allObjs, applyObjs...)
allObjs = append(allObjs, pruneObjs...)
g, err := graph.DependencyGraph(allObjs)
if err != nil {
t.Collector.Collect(err)
}
// Store graph for use by DependencyFilter
taskContext.SetGraph(g)
// Sort objects into phases (apply order).
// Cycles will be treated as validation errors.
idSetList, err := g.Sort()
if err != nil {
t.Collector.Collect(err)
}
// Filter objects with cycles or invalid dependency annotations
applyObjs = t.Collector.FilterInvalidObjects(applyObjs)
pruneObjs = t.Collector.FilterInvalidObjects(pruneObjs)
if !o.Destroy {
// InvAddTask creates the inventory and adds any objects being applied
klog.V(2).Infof("adding inventory add task (%d objects)", len(applyObjs))
tasks = append(tasks, &task.InvAddTask{
TaskName: "inventory-add-0",
InvClient: t.InvClient,
InvInfo: t.invInfo,
Objects: applyObjs,
DryRun: o.DryRunStrategy,
})
}
if len(applyObjs) > 0 {
// Register actuation plan in the inventory
for _, id := range object.UnstructuredSetToObjMetadataSet(applyObjs) {
taskContext.InventoryManager().AddPendingApply(id)
}
// Filter idSetList down to just apply objects
applySets := graph.HydrateSetList(idSetList, applyObjs)
for _, applySet := range applySets {
tasks = append(tasks,
t.newApplyTask(applySet, t.ApplyFilters, t.ApplyMutators, o))
// dry-run skips wait tasks
if !o.DryRunStrategy.ClientOrServerDryRun() {
applyIds := object.UnstructuredSetToObjMetadataSet(applySet)
tasks = append(tasks,
t.newWaitTask(applyIds, taskrunner.AllCurrent, o.ReconcileTimeout))
}
}
}
if o.Prune && len(pruneObjs) > 0 {
// Register actuation plan in the inventory
for _, id := range object.UnstructuredSetToObjMetadataSet(pruneObjs) {
taskContext.InventoryManager().AddPendingDelete(id)
}
// Filter idSetList down to just prune objects
pruneSets := graph.HydrateSetList(idSetList, pruneObjs)
// Reverse apply order to get prune order
graph.ReverseSetList(pruneSets)
for _, pruneSet := range pruneSets {
tasks = append(tasks,
t.newPruneTask(pruneSet, t.PruneFilters, o))
// dry-run skips wait tasks
if !o.DryRunStrategy.ClientOrServerDryRun() {
pruneIds := object.UnstructuredSetToObjMetadataSet(pruneSet)
tasks = append(tasks,
t.newWaitTask(pruneIds, taskrunner.AllNotFound, o.PruneTimeout))
}
}
}
prevInvIds, _ := t.InvClient.GetClusterObjs(t.invInfo)
klog.V(2).Infoln("adding delete/update inventory task")
var taskName string
if o.Destroy {
taskName = "inventory-delete-or-update-0"
} else {
taskName = "inventory-set-0"
}
tasks = append(tasks, &task.DeleteOrUpdateInvTask{
TaskName: taskName,
InvClient: t.InvClient,
InvInfo: t.invInfo,
PrevInventory: prevInvIds,
DryRun: o.DryRunStrategy,
Destroy: o.Destroy,
})
return &TaskQueue{tasks: tasks}
}
// AppendApplyTask appends a task to the task queue to apply the passed objects
// to the cluster. Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) newApplyTask(applyObjs object.UnstructuredSet,
applyFilters []filter.ValidationFilter, applyMutators []mutator.Interface, o Options) taskrunner.Task {
applyObjs = t.Collector.FilterInvalidObjects(applyObjs)
klog.V(2).Infof("adding apply task (%d objects)", len(applyObjs))
task := &task.ApplyTask{
TaskName: fmt.Sprintf("apply-%d", t.applyCounter),
Objects: applyObjs,
Filters: applyFilters,
Mutators: applyMutators,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
DynamicClient: t.DynamicClient,
OpenAPIGetter: t.OpenAPIGetter,
InfoHelper: t.InfoHelper,
Mapper: t.Mapper,
}
t.applyCounter++
return task
}
// AppendWaitTask appends a task to wait on the passed objects to the task queue.
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) newWaitTask(waitIds object.ObjMetadataSet, condition taskrunner.Condition,
waitTimeout time.Duration) taskrunner.Task {
waitIds = t.Collector.FilterInvalidIds(waitIds)
klog.V(2).Infoln("adding wait task")
task := taskrunner.NewWaitTask(
fmt.Sprintf("wait-%d", t.waitCounter),
waitIds,
condition,
waitTimeout,
t.Mapper,
)
t.waitCounter++
return task
}
// AppendPruneTask appends a task to delete objects from the cluster to the task queue.
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) newPruneTask(pruneObjs object.UnstructuredSet,
pruneFilters []filter.ValidationFilter, o Options) taskrunner.Task {
pruneObjs = t.Collector.FilterInvalidObjects(pruneObjs)
klog.V(2).Infof("adding prune task (%d objects)", len(pruneObjs))
task := &task.PruneTask{
TaskName: fmt.Sprintf("prune-%d", t.pruneCounter),
Objects: pruneObjs,
Filters: pruneFilters,
Pruner: t.Pruner,
PropagationPolicy: o.PrunePropagationPolicy,
DryRunStrategy: o.DryRunStrategy,
Destroy: o.Destroy,
}
t.pruneCounter++
return task
}