-
Notifications
You must be signed in to change notification settings - Fork 153
/
apply_manifests.go
95 lines (76 loc) · 2.67 KB
/
apply_manifests.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package run
import (
"bytes"
"context"
"fmt"
"time"
"github.com/fluxcd/pkg/ssa"
"github.com/weaveworks/weave-gitops/pkg/logger"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/cli-runtime/pkg/genericclioptions"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
// apply is the equivalent of 'kubectl apply --server-side -f'.
func apply(log logger.Logger, ctx context.Context, kubeClient ctrlclient.Client, kubeConfigArgs genericclioptions.RESTClientGetter, manifestsContent []byte) (string, error) {
objs, err := ssa.ReadObjects(bytes.NewReader(manifestsContent))
if err != nil {
log.Failuref("Error reading Kubernetes objects from the manifests")
return "", err
}
if len(objs) == 0 {
return "", fmt.Errorf("no Kubernetes objects found in the manifests")
}
if err := ssa.SetNativeKindsDefaults(objs); err != nil {
log.Failuref("Error setting native kinds defaults")
return "", err
}
changeSet := ssa.NewChangeSet()
// contains only CRDs and Namespaces
var stageOne []*unstructured.Unstructured
// contains all objects except for CRDs and Namespaces
var stageTwo []*unstructured.Unstructured
for _, u := range objs {
if ssa.IsClusterDefinition(u) {
stageOne = append(stageOne, u)
} else {
stageTwo = append(stageTwo, u)
}
}
if len(stageOne) > 0 {
cs, err := applySet(log, ctx, kubeClient, kubeConfigArgs, stageOne)
if err != nil {
log.Failuref("Error applying stage one objects")
return "", err
}
changeSet.Append(cs.Entries)
if err := waitForSet(log, ctx, kubeClient, kubeConfigArgs, changeSet); err != nil {
log.Failuref("Error waiting for set")
return "", err
}
}
if len(stageTwo) > 0 {
cs, err := applySet(log, ctx, kubeClient, kubeConfigArgs, stageTwo)
if err != nil {
log.Failuref("Error applying stage two objects")
return "", err
}
changeSet.Append(cs.Entries)
}
return changeSet.String(), nil
}
func applySet(log logger.Logger, ctx context.Context, kubeClient ctrlclient.Client, kubeConfigArgs genericclioptions.RESTClientGetter, objects []*unstructured.Unstructured) (*ssa.ChangeSet, error) {
man, err := newManager(log, ctx, kubeClient, kubeConfigArgs)
if err != nil {
log.Failuref("Error applying set")
return nil, err
}
return man.ApplyAll(ctx, objects, ssa.DefaultApplyOptions())
}
func waitForSet(log logger.Logger, ctx context.Context, kubeClient ctrlclient.Client, rcg genericclioptions.RESTClientGetter, changeSet *ssa.ChangeSet) error {
man, err := newManager(log, ctx, kubeClient, rcg)
if err != nil {
log.Failuref("Error waiting for set")
return err
}
return man.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{Interval: 2 * time.Second, Timeout: time.Minute})
}