This repository has been archived by the owner on May 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mutator.go
130 lines (110 loc) · 5.66 KB
/
mutator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package mutation
import (
goCtx "context"
"github.com/pkg/errors"
"github.com/riotkit-org/volume-syncing-controller/pkg/apis/riotkit.org/v1alpha1"
"github.com/riotkit-org/volume-syncing-controller/pkg/client/clientset/versioned"
"github.com/riotkit-org/volume-syncing-controller/pkg/server/cache"
"github.com/riotkit-org/volume-syncing-controller/pkg/server/context"
"github.com/riotkit-org/volume-syncing-controller/pkg/server/encryption"
"github.com/sirupsen/logrus"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type PodMutationController struct {
cache *cache.Cache
rktClient versioned.Interface
kubeClient kubernetes.Interface
}
func (m *PodMutationController) parseRequestIntoModels(request *admissionv1.AdmissionRequest) (*corev1.Pod, *corev1.Pod, *v1alpha1.PodFilesystemSync, error) {
pod, podResolveErr := ResolvePod(request)
if podResolveErr != nil {
return &corev1.Pod{}, &corev1.Pod{}, &v1alpha1.PodFilesystemSync{}, errors.Wrap(podResolveErr, "Cannot process AdmissionRequest, cannot resolve Pod")
}
originalPod := pod.DeepCopy()
// then retrieve a matching `kind: PodFilesystemSync` object to know how to set up synchronization for the `kind: Pod`
matchingPodFilesystemSync, matchedAny, matchingErr := m.cache.FindMatchingForPod(pod)
if matchingErr != nil {
return &corev1.Pod{}, &corev1.Pod{}, &v1alpha1.PodFilesystemSync{}, errors.Wrap(matchingErr, "Cannot match any `kind: PodFilesystemSync` for selected `kind: Pod`")
}
if !matchedAny {
return &corev1.Pod{}, &corev1.Pod{}, &v1alpha1.PodFilesystemSync{}, errors.New("No matching `kind: PodFilesystemSync` found for Pod")
}
return pod, originalPod, matchingPodFilesystemSync, nil
}
// ProcessAdmissionRequest is retrieving all the required data, calling to resolve, then calling a mutation action
func (m *PodMutationController) ProcessAdmissionRequest(review *admissionv1.AdmissionReview, image string) (corev1.Pod, corev1.Pod, error) {
pod, originalPod, matchingPodFilesystemSync, resolveErr := m.parseRequestIntoModels(review.Request)
if resolveErr != nil {
return corev1.Pod{}, corev1.Pod{}, resolveErr
}
// verify secrets
secretsVerificationErr := VerifySecrets(matchingPodFilesystemSync, pod.Namespace)
if secretsVerificationErr != nil {
return corev1.Pod{}, corev1.Pod{}, errors.Wrap(secretsVerificationErr, "The secrets are invalid")
}
// prepare environment variables
// DO NOT CONFUSE WITH SECRETS - those are mounted to not leak sensitive information in `kind: Pod` definition
env, envResolveErr := ResolveTemplatedEnvironment(pod, matchingPodFilesystemSync)
if envResolveErr != nil {
return corev1.Pod{}, corev1.Pod{}, errors.Wrap(envResolveErr, "Cannot resolve environment variables")
}
// this context keeps decision about what synchronization options are to be used
params, paramsErr := context.NewSynchronizationParameters(pod, matchingPodFilesystemSync, env)
if paramsErr != nil {
return corev1.Pod{}, corev1.Pod{}, errors.Wrap(paramsErr, "Cannot create patch for `kind: Pod`")
}
if err := encryption.AttachAutomaticEncryption(matchingPodFilesystemSync, ¶ms, m.kubeClient); err != nil {
return corev1.Pod{}, corev1.Pod{}, errors.Wrap(err, "Error while trying to setup automatic encryption")
}
// finally try to patch the `kind: Pod` using definition from `kind: PodFilesystemSync`
if err := m.applyPatchToPod(pod, image, matchingPodFilesystemSync, params); err != nil {
return corev1.Pod{}, corev1.Pod{}, errors.Wrap(err, "Cannot mutate `kind: Pod`")
}
// change status
if claimErr := matchingPodFilesystemSync.ClaimDirectoryByPod(pod); claimErr != nil {
return corev1.Pod{}, corev1.Pod{}, errors.Wrap(claimErr, "Cannot claim directory for `kind: Pod`")
}
if err := m.updateStatus(matchingPodFilesystemSync); err != nil {
return corev1.Pod{}, corev1.Pod{}, err
}
return *pod, *originalPod, nil
}
// updateStatus is updating status field of a PodFilesystemSync object
func (m *PodMutationController) updateStatus(syncDefinition *v1alpha1.PodFilesystemSync) error {
logrus.Debug("Updating status")
client := m.rktClient.RiotkitV1alpha1().PodFilesystemSyncs(syncDefinition.Namespace)
clusterDefinition, getErr := client.Get(goCtx.TODO(), syncDefinition.Name, v1.GetOptions{})
if getErr != nil {
return errors.Wrap(getErr, "Cannot update status field - error retrieving object")
}
syncDefinition.SetResourceVersion(clusterDefinition.GetResourceVersion())
_, statusUpdateErr := client.UpdateStatus(
goCtx.TODO(), syncDefinition, v1.UpdateOptions{})
if statusUpdateErr != nil {
return errors.Wrap(statusUpdateErr, "Cannot update status field")
}
return nil
}
// applyPatchToPod is applying a patch to `kind: Pod` before it gets scheduled
func (m *PodMutationController) applyPatchToPod(pod *corev1.Pod, image string, syncDefinition *v1alpha1.PodFilesystemSync, params context.SynchronizationParameters) error {
// decide if we should start the init container with remote-to-local-sync
shouldRestoreFromRemoteOnInit, configErr := syncDefinition.ShouldRestoreFilesFromRemote(pod)
if configErr != nil {
return errors.Wrap(configErr, "Error creating patch for `kind: Pod` - cannot decide if the `kind: Pod` should restore files from remote on init")
}
mutationErr := MutatePodByInjectingContainers(pod, image, shouldRestoreFromRemoteOnInit, syncDefinition.ShouldSynchronizeToRemote(), params)
if mutationErr != nil {
return errors.Wrap(mutationErr, "Cannot patch `kind: Pod`")
}
return nil
}
func NewPodMutator(cache *cache.Cache, r versioned.Interface, k kubernetes.Interface) PodMutationController {
return PodMutationController{
cache: cache,
rktClient: r,
kubeClient: k,
}
}