-
Notifications
You must be signed in to change notification settings - Fork 0
/
applier.go
240 lines (204 loc) · 6.99 KB
/
applier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright Mia srl
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"fmt"
"slices"
"time"
"github.com/mia-platform/jpl/pkg/client/cache"
"github.com/mia-platform/jpl/pkg/event"
"github.com/mia-platform/jpl/pkg/filter"
"github.com/mia-platform/jpl/pkg/generator"
"github.com/mia-platform/jpl/pkg/inventory"
"github.com/mia-platform/jpl/pkg/mutator"
"github.com/mia-platform/jpl/pkg/poller"
"github.com/mia-platform/jpl/pkg/resource"
"github.com/mia-platform/jpl/pkg/runner"
"github.com/mia-platform/jpl/pkg/runner/task"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
)
// Applier can be used for appling a list of resources to a remote api-server
type Applier struct {
mapper meta.RESTMapper
client dynamic.Interface
infoFetcher task.InfoFetcher
runner runner.TaskRunner
inventory inventory.Store
generators []generator.Interface
mutators []mutator.Interface
filters []filter.Interface
poller poller.StatusPoller
}
// ApplierOptions options for the apply step
type ApplierOptions struct {
DryRun bool
Timeout time.Duration
FieldManager string
}
// Run will apply the passed objects to a remote api-server
func (a *Applier) Run(ctx context.Context, objects []*unstructured.Unstructured, options ApplierOptions) <-chan event.Event {
eventChannel := make(chan event.Event)
go func() {
defer close(eventChannel)
applierCtx := ctx
if options.Timeout > 0 {
var cancel context.CancelFunc
applierCtx, cancel = context.WithTimeout(ctx, options.Timeout)
defer cancel()
}
resourceCache := cache.NewCachedResourceGetter(a.mapper, a.client)
remoteObjects, err := a.loadObjectsFromInventory(applierCtx, resourceCache)
if err != nil {
handleError(eventChannel, err)
return
}
generatedObject, abort := a.generateObjects(objects, eventChannel, resourceCache)
if abort {
return
}
objects = append(objects, generatedObject...)
if abort := a.mutateObjects(objects, eventChannel, resourceCache); abort {
return
}
objectsToPrune := findObjectsToPrune(remoteObjects, objects)
manager := inventory.NewManager(a.inventory, remoteObjects)
queueBuilder := QueueBuilder{
Client: a.client,
Mapper: a.mapper,
Manager: manager,
RemoteGetter: resourceCache,
InfoFetcher: a.infoFetcher,
Filters: a.filters,
Poller: a.poller,
}
queueOptions := QueueOptions{
DryRun: options.DryRun,
Prune: true,
FieldManager: options.FieldManager,
}
contextState := &RunnerState{
eventChannel: eventChannel,
manager: manager,
context: applierCtx,
}
tasksQueue, err := queueBuilder.
WithObjects(objects).
WithPruneObjects(objectsToPrune).
Build(queueOptions)
if err != nil {
handleError(eventChannel, err)
return
}
if err := a.runner.RunWithQueue(contextState, tasksQueue); err != nil {
handleError(eventChannel, err)
}
}()
return eventChannel
}
// generateObjects will cycle through all the generator of the applier and accumulate any object generated by them,
// return false if an error is encountered
func (a *Applier) generateObjects(objects []*unstructured.Unstructured, eventChannel chan event.Event, remoteGetter cache.RemoteResourceGetter) ([]*unstructured.Unstructured, bool) {
var generatedObject []*unstructured.Unstructured
for _, rg := range a.generators {
for _, obj := range objects {
objMetadata := meta.AsPartialObjectMetadata(obj)
objMetadata.TypeMeta = metav1.TypeMeta{
Kind: obj.GetKind(),
APIVersion: obj.GetAPIVersion(),
}
if !rg.CanHandleResource(objMetadata) {
continue
}
generated, err := rg.Generate(obj, remoteGetter)
if err != nil {
handleError(eventChannel, fmt.Errorf("generate resource failed: %w", err))
return generatedObject, true
}
generatedObject = append(generatedObject, generated...)
}
}
return generatedObject, false
}
// mutateObjects will cycle through all the mutators of the applier, return false if an error is encountered
func (a *Applier) mutateObjects(objects []*unstructured.Unstructured, eventChannel chan event.Event, remoteGetter cache.RemoteResourceGetter) bool {
for _, mt := range a.mutators {
for _, obj := range objects {
objMetadata := meta.AsPartialObjectMetadata(obj)
objMetadata.TypeMeta = metav1.TypeMeta{
Kind: obj.GetKind(),
APIVersion: obj.GetAPIVersion(),
}
if !mt.CanHandleResource(objMetadata) {
continue
}
if err := mt.Mutate(obj, remoteGetter); err != nil {
handleError(eventChannel, fmt.Errorf("mutate resource failed: %w", err))
return true
}
}
}
return false
}
// loadObjectsFromInventory return the array of Unstructured objects that are being tracked in the inventory.
// It will skip objects that are not found, and return an error only in case some other problem is encountered
// during retrivial, like network problems, or missing permissions
func (a *Applier) loadObjectsFromInventory(ctx context.Context, cache cache.RemoteResourceGetter) ([]*unstructured.Unstructured, error) {
objIDs, err := a.inventory.Load(ctx)
if err != nil {
return nil, err
}
remoteObjects := make([]*unstructured.Unstructured, 0, len(objIDs))
for objID := range objIDs {
obj, err := cache.Get(ctx, objID)
if err != nil {
return nil, err
}
if obj != nil {
remoteObjects = append(remoteObjects, obj)
}
}
return slices.Clip(remoteObjects), nil
}
// findObjectsToPrune return the element in first that are not present in second
func findObjectsToPrune(first []*unstructured.Unstructured, second []*unstructured.Unstructured) []*unstructured.Unstructured {
firstSet := make(sets.Set[resource.ObjectMetadata], len(first))
for _, obj := range first {
firstSet.Insert(resource.ObjectMetadataFromUnstructured(obj))
}
for _, obj := range second {
firstSet.Delete(resource.ObjectMetadataFromUnstructured(obj))
}
returnedObjects := make([]*unstructured.Unstructured, 0, len(firstSet))
for _, obj := range first {
if firstSet.Has(resource.ObjectMetadataFromUnstructured(obj)) {
returnedObjects = append(returnedObjects, obj)
}
}
return returnedObjects
}
// handleError send a TypeError event in the channel with err payload
func handleError(channel chan event.Event, err error) {
channel <- event.Event{
Type: event.TypeError,
ErrorInfo: event.ErrorInfo{
Error: err,
},
}
}