/
destroyer.go
196 lines (176 loc) · 6.24 KB
/
destroyer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright 2019 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package apply
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/solver"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
)
// Destroyer performs the step of grabbing all the previous inventory objects and
// prune them. This also deletes all the previous inventory objects
type Destroyer struct {
pruner *prune.Pruner
statusWatcher watcher.StatusWatcher
invClient inventory.Client
mapper meta.RESTMapper
client dynamic.Interface
openAPIGetter discovery.OpenAPISchemaInterface
infoHelper info.Helper
}
type DestroyerOptions struct {
// InventoryPolicy defines the inventory policy of apply.
InventoryPolicy inventory.Policy
// DryRunStrategy defines whether changes should actually be performed,
// or if it is just talk and no action.
DryRunStrategy common.DryRunStrategy
// DeleteTimeout defines how long we should wait for resources
// to be fully deleted.
DeleteTimeout time.Duration
// DeletePropagationPolicy defines the deletion propagation policy
// that should be used. If this is not provided, the default is to
// use the Background policy.
DeletePropagationPolicy metav1.DeletionPropagation
// EmitStatusEvents defines whether status events should be
// emitted on the eventChannel to the caller.
EmitStatusEvents bool
// ValidationPolicy defines how to handle invalid objects.
ValidationPolicy validation.Policy
}
func setDestroyerDefaults(o *DestroyerOptions) {
if o.DeletePropagationPolicy == "" {
o.DeletePropagationPolicy = metav1.DeletePropagationBackground
}
}
// Run performs the destroy step. Passes the inventory object. This
// happens asynchronously on progress and any errors are reported
// back on the event channel.
func (d *Destroyer) Run(ctx context.Context, invInfo inventory.Info, options DestroyerOptions) <-chan event.Event {
eventChannel := make(chan event.Event)
setDestroyerDefaults(&options)
go func() {
defer close(eventChannel)
// Retrieve the objects to be deleted from the cluster. Second parameter is empty
// because no local objects returns all inventory objects for deletion.
emptyLocalObjs := object.UnstructuredSet{}
deleteObjs, err := d.pruner.GetPruneObjs(invInfo, emptyLocalObjs, prune.Options{
DryRunStrategy: options.DryRunStrategy,
})
if err != nil {
handleError(eventChannel, err)
return
}
// Validate the resources to make sure we catch those problems early
// before anything has been updated in the cluster.
vCollector := &validation.Collector{}
validator := &validation.Validator{
Collector: vCollector,
Mapper: d.mapper,
}
validator.Validate(deleteObjs)
// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
klog.V(4).Infoln("destroyer building task queue...")
deleteFilters := []filter.ValidationFilter{
filter.PreventRemoveFilter{},
filter.InventoryPolicyPruneFilter{
Inv: invInfo,
InvPolicy: options.InventoryPolicy,
},
filter.DependencyFilter{
TaskContext: taskContext,
ActuationStrategy: actuation.ActuationStrategyDelete,
DryRunStrategy: options.DryRunStrategy,
},
}
taskBuilder := &solver.TaskQueueBuilder{
Pruner: d.pruner,
DynamicClient: d.client,
OpenAPIGetter: d.openAPIGetter,
InfoHelper: d.infoHelper,
Mapper: d.mapper,
InvClient: d.invClient,
Collector: vCollector,
PruneFilters: deleteFilters,
}
opts := solver.Options{
Destroy: true,
Prune: true,
DryRunStrategy: options.DryRunStrategy,
PrunePropagationPolicy: options.DeletePropagationPolicy,
PruneTimeout: options.DeleteTimeout,
InventoryPolicy: options.InventoryPolicy,
}
// Build the ordered set of tasks to execute.
taskQueue := taskBuilder.
WithPruneObjects(deleteObjs).
WithInventory(invInfo).
Build(taskContext, opts)
klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))
// Handle validation errors
switch options.ValidationPolicy {
case validation.ExitEarly:
err = vCollector.ToError()
if err != nil {
handleError(eventChannel, err)
return
}
case validation.SkipInvalid:
for _, err := range vCollector.Errors {
handleValidationError(eventChannel, err)
}
default:
handleError(eventChannel, fmt.Errorf("invalid ValidationPolicy: %q", options.ValidationPolicy))
return
}
// Register invalid objects to be retained in the inventory, if present.
for _, id := range vCollector.InvalidIds {
taskContext.AddInvalidObject(id)
}
// Send event to inform the caller about the resources that
// will be pruned.
eventChannel <- event.Event{
Type: event.InitType,
InitEvent: event.InitEvent{
ActionGroups: taskQueue.ToActionGroups(),
},
}
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("destroyer building TaskStatusRunner...")
deleteIds := object.UnstructuredSetToObjMetadataSet(deleteObjs)
statusWatcher := d.statusWatcher
// Disable watcher for dry runs
if opts.DryRunStrategy.ClientOrServerDryRun() {
statusWatcher = watcher.BlindStatusWatcher{}
}
runner := taskrunner.NewTaskStatusRunner(deleteIds, statusWatcher)
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
EmitStatusEvents: options.EmitStatusEvents,
})
if err != nil {
handleError(eventChannel, err)
return
}
}()
return eventChannel
}