generated from kubernetes/kubernetes-template-project
/
machine.go
484 lines (406 loc) · 16.5 KB
/
machine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubevirt
import (
gocontext "context"
"fmt"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubedrain "k8s.io/kubectl/pkg/drain"
kubevirtv1 "kubevirt.io/api/core/v1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
)
const (
vmiDeleteGraceTimeoutDurationSeconds = 600 // 10 minutes
)
// Machine implement a service for managing the KubeVirt VM hosting a kubernetes node.
type Machine struct {
client client.Client
namespace string
machineContext *context.MachineContext
vmiInstance *kubevirtv1.VirtualMachineInstance
vmInstance *kubevirtv1.VirtualMachine
sshKeys *ssh.ClusterNodeSshKeys
getCommandExecutor func(string, *ssh.ClusterNodeSshKeys) ssh.VMCommandExecutor
}
// NewMachine returns a new Machine service for the given context.
func NewMachine(ctx *context.MachineContext, client client.Client, namespace string, sshKeys *ssh.ClusterNodeSshKeys) (*Machine, error) {
machine := &Machine{
client: client,
namespace: namespace,
machineContext: ctx,
vmiInstance: nil,
vmInstance: nil,
sshKeys: sshKeys,
getCommandExecutor: ssh.NewVMCommandExecutor,
}
namespacedName := types.NamespacedName{Namespace: namespace, Name: ctx.KubevirtMachine.Name}
vm := &kubevirtv1.VirtualMachine{}
vmi := &kubevirtv1.VirtualMachineInstance{}
// Get the active running VMI if it exists
err := client.Get(ctx.Context, namespacedName, vmi)
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
} else {
machine.vmiInstance = vmi
}
// Get the top level VM object if it exists
err = client.Get(ctx.Context, namespacedName, vm)
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
} else {
machine.vmInstance = vm
}
return machine, nil
}
// IsTerminal Reports back if the VM is either being requested to terminate or is terminated
// in a way that it will never recover from.
func (m *Machine) IsTerminal() (bool, string, error) {
if m.vmInstance == nil || m.vmiInstance == nil {
// vm/vmi hasn't been created yet
return false, "", nil
}
// VMI is being asked to terminate gracefully due to node drain
if !m.vmiInstance.IsFinal() &&
!m.vmiInstance.IsMigratable() &&
m.vmiInstance.Status.EvacuationNodeName != "" {
// VM's infra node is being drained and VM is not live migratable.
// We need to report a FailureReason so the MachineHealthCheck and
// MachineSet controllers will gracefully take the VM down.
return true, "The Machine's VM pod is marked for eviction due to infra node drain.", nil
}
// The infrav1.KubevirtVMTerminalLabel is a way users or automation to mark
// a VM as being in a terminal state that requires remediation. This is used
// by the functional test suite to test remediation and can also be triggered
// by users as a way to manually trigger remediation.
terminalReason, ok := m.vmInstance.Labels[infrav1.KubevirtMachineVMTerminalLabel]
if ok {
return true, fmt.Sprintf("VM's %s label has the vm marked as being terminal with reason [%s]", infrav1.KubevirtMachineVMTerminalLabel, terminalReason), nil
}
// Also check the VMI for this label
terminalReason, ok = m.vmiInstance.Labels[infrav1.KubevirtMachineVMTerminalLabel]
if ok {
return true, fmt.Sprintf("VMI's %s label has the vm marked as being terminal with reason [%s]", infrav1.KubevirtMachineVMTerminalLabel, terminalReason), nil
}
runStrategy, err := m.vmInstance.RunStrategy()
if err != nil {
return false, "", err
}
switch runStrategy {
case kubevirtv1.RunStrategyAlways:
// VM should recover if it is down.
return false, "", nil
case kubevirtv1.RunStrategyManual:
// If VM is manually controlled, we stay out of the loop
return false, "", nil
case kubevirtv1.RunStrategyHalted, kubevirtv1.RunStrategyOnce:
if m.vmiInstance.IsFinal() {
return true, "VMI has reached a permanent finalized state", nil
}
return false, "", nil
case kubevirtv1.RunStrategyRerunOnFailure:
// only recovers when vmi is failed
if m.vmiInstance.Status.Phase == kubevirtv1.Succeeded {
return true, "VMI has reached a permanent finalized state", nil
}
return false, "", nil
}
return false, "", nil
}
// Exists checks if the VM has been provisioned already.
func (m *Machine) Exists() bool {
return m.vmInstance != nil
}
// Create creates a new VM for this machine.
func (m *Machine) Create(ctx gocontext.Context) error {
m.machineContext.Logger.Info(fmt.Sprintf("Creating VM with role '%s'...", nodeRole(m.machineContext)))
virtualMachine := newVirtualMachineFromKubevirtMachine(m.machineContext, m.namespace)
mutateFn := func() (err error) {
if virtualMachine.Labels == nil {
virtualMachine.Labels = map[string]string{}
}
if virtualMachine.Spec.Template.ObjectMeta.Labels == nil {
virtualMachine.Spec.Template.ObjectMeta.Labels = map[string]string{}
}
virtualMachine.Labels[clusterv1.ClusterNameLabel] = m.machineContext.Cluster.Name
virtualMachine.Labels[infrav1.KubevirtMachineNameLabel] = m.machineContext.KubevirtMachine.Name
virtualMachine.Labels[infrav1.KubevirtMachineNamespaceLabel] = m.machineContext.KubevirtMachine.Namespace
virtualMachine.Spec.Template.ObjectMeta.Labels[infrav1.KubevirtMachineNameLabel] = m.machineContext.KubevirtMachine.Name
virtualMachine.Spec.Template.ObjectMeta.Labels[infrav1.KubevirtMachineNamespaceLabel] = m.machineContext.KubevirtMachine.Namespace
return nil
}
if _, err := controllerutil.CreateOrUpdate(ctx, m.client, virtualMachine, mutateFn); err != nil {
return err
}
return nil
}
// Returns if VMI has ready condition or not.
func (m *Machine) hasReadyCondition() bool {
if m.vmiInstance == nil {
return false
}
for _, cond := range m.vmiInstance.Status.Conditions {
if cond.Type == kubevirtv1.VirtualMachineInstanceReady &&
cond.Status == corev1.ConditionTrue {
return true
}
}
return false
}
// Address returns the IP address of the VM.
func (m *Machine) Address() string {
if m.vmiInstance != nil && len(m.vmiInstance.Status.Interfaces) > 0 {
return m.vmiInstance.Status.Interfaces[0].IP
}
return ""
}
// IsReady checks if the VM is ready
func (m *Machine) IsReady() bool {
return m.hasReadyCondition()
}
// SupportsCheckingIsBootstrapped checks if we have a method of checking
// that this bootstrapper has completed.
func (m *Machine) SupportsCheckingIsBootstrapped() bool {
// Right now, we can only check if bootstrapping has
// completed if we are using a bootstrapper that allows
// for us to inject ssh keys into the guest.
if m.sshKeys != nil {
return m.machineContext.HasInjectedCapkSSHKeys(m.sshKeys.PublicKey)
}
return false
}
// IsBootstrapped checks if the VM is bootstrapped with Kubernetes.
func (m *Machine) IsBootstrapped() bool {
// CheckStrategy value is already sanitized by apiserver
switch m.machineContext.KubevirtMachine.Spec.BootstrapCheckSpec.CheckStrategy {
case "none":
// skip bootstrap check and always returns positively
return true
case "":
fallthrough // ssh is default check strategy, fallthrough
case "ssh":
return m.IsBootstrappedWithSSH()
default:
// Since CRD CheckStrategy field is validated by an enum, this case should never be hit
return false
}
}
// IsBootstrappedWithSSH checks if the VM is bootstrapped with Kubernetes using SSH strategy.
func (m *Machine) IsBootstrappedWithSSH() bool {
if !m.IsReady() || m.sshKeys == nil {
return false
}
executor := m.getCommandExecutor(m.Address(), m.sshKeys)
output, err := executor.ExecuteCommand("cat /run/cluster-api/bootstrap-success.complete")
if err != nil || output != "success" {
return false
}
return true
}
// GenerateProviderID generates the KubeVirt provider ID to be used for the NodeRef
func (m *Machine) GenerateProviderID() (string, error) {
if m.vmiInstance == nil {
return "", errors.New("Underlying Kubevirt VM is NOT running")
}
providerID := fmt.Sprintf("kubevirt://%s", m.machineContext.KubevirtMachine.Name)
return providerID, nil
}
// Delete deletes VM for this machine.
func (m *Machine) Delete() error {
namespacedName := types.NamespacedName{Namespace: m.namespace, Name: m.machineContext.KubevirtMachine.Name}
vm := &kubevirtv1.VirtualMachine{}
if err := m.client.Get(m.machineContext.Context, namespacedName, vm); err != nil {
if apierrors.IsNotFound(err) {
m.machineContext.Logger.Info("VM does not exist, nothing to do.")
return nil
}
return errors.Wrapf(err, "failed to retrieve VM to delete")
}
if err := m.client.Delete(gocontext.Background(), vm); err != nil {
return errors.Wrapf(err, "failed to delete VM")
}
return nil
}
func (m *Machine) DrainNodeIfNeeded(wrkldClstr workloadcluster.WorkloadCluster) (time.Duration, error) {
if m.vmiInstance == nil || !m.shouldGracefulDeleteVMI() {
if _, anntExists := m.machineContext.KubevirtMachine.Annotations[infrav1.VmiDeletionGraceTime]; anntExists {
if err := m.removeGracePeriodAnnotation(); err != nil {
return 100 * time.Millisecond, err
}
}
return 0, nil
}
exceeded, err := m.drainGracePeriodExceeded()
if err != nil {
return 0, err
}
if !exceeded {
retryDuration, err := m.drainNode(wrkldClstr)
if err != nil {
return 0, err
}
if retryDuration > 0 {
return retryDuration, nil
}
}
// now, when the node is drained (or vmiDeleteGraceTimeoutDurationSeconds has passed), we can delete the VMI
propagationPolicy := metav1.DeletePropagationForeground
err = m.client.Delete(m.machineContext, m.vmiInstance, &client.DeleteOptions{PropagationPolicy: &propagationPolicy})
if err != nil {
m.machineContext.Logger.Error(err, "failed to delete VirtualMachineInstance")
return 0, err
}
if err = m.removeGracePeriodAnnotation(); err != nil {
return 100 * time.Millisecond, err
}
// requeue to force reading the VMI again
return time.Second * 10, nil
}
const removeGracePeriodAnnotationPatch = `[{"op": "remove", "path": "/metadata/annotations/` + infrav1.VmiDeletionGraceTimeEscape + `"}]`
func (m *Machine) removeGracePeriodAnnotation() error {
patch := client.RawPatch(types.JSONPatchType, []byte(removeGracePeriodAnnotationPatch))
if err := m.client.Patch(m.machineContext, m.machineContext.KubevirtMachine, patch); err != nil {
return fmt.Errorf("failed to remove the %s annotation to the KubeVirtMachine %s; %w", infrav1.VmiDeletionGraceTime, m.machineContext.KubevirtMachine.Name, err)
}
return nil
}
func (m *Machine) shouldGracefulDeleteVMI() bool {
if m.vmiInstance.DeletionTimestamp != nil {
m.machineContext.Logger.V(4).Info("DrainNode: the virtualMachineInstance is already in deletion process. Nothing to do here")
return false
}
if m.vmiInstance.Spec.EvictionStrategy == nil || *m.vmiInstance.Spec.EvictionStrategy != kubevirtv1.EvictionStrategyExternal {
m.machineContext.Logger.V(4).Info("DrainNode: graceful deletion is not supported for virtualMachineInstance. Nothing to do here")
return false
}
// KubeVirt will set the EvacuationNodeName field in case of guest node eviction. If the field is not set, there is
// nothing to do.
if len(m.vmiInstance.Status.EvacuationNodeName) == 0 {
m.machineContext.Logger.V(4).Info("DrainNode: the virtualMachineInstance is not marked for deletion. Nothing to do here")
return false
}
return true
}
// wait vmiDeleteGraceTimeoutDurationSeconds to the node to be drained. If this time had passed, don't wait anymore.
func (m *Machine) drainGracePeriodExceeded() (bool, error) {
if graceTime, found := m.machineContext.KubevirtMachine.Annotations[infrav1.VmiDeletionGraceTime]; found {
deletionGraceTime, err := time.Parse(time.RFC3339, graceTime)
if err != nil { // wrong format - rewrite
if err = m.setVmiDeletionGraceTime(); err != nil {
return false, err
}
} else {
return time.Now().UTC().After(deletionGraceTime), nil
}
} else {
if err := m.setVmiDeletionGraceTime(); err != nil {
return false, err
}
}
return false, nil
}
func (m *Machine) setVmiDeletionGraceTime() error {
m.machineContext.Logger.Info(fmt.Sprintf("setting the %s annotation", infrav1.VmiDeletionGraceTime))
graceTime := time.Now().Add(vmiDeleteGraceTimeoutDurationSeconds * time.Second).UTC().Format(time.RFC3339)
patch := fmt.Sprintf(`{"metadata":{"annotations":{"%s": "%s"}}}`, infrav1.VmiDeletionGraceTime, graceTime)
patchRequest := client.RawPatch(types.MergePatchType, []byte(patch))
if err := m.client.Patch(m.machineContext, m.machineContext.KubevirtMachine, patchRequest); err != nil {
return fmt.Errorf("failed to add the %s annotation to the KubeVirtMachine %s; %w", infrav1.VmiDeletionGraceTime, m.machineContext.KubevirtMachine.Name, err)
}
return nil
}
// This functions drains a node from a tenant cluster.
// The function returns 3 values:
// * drain done - boolean
// * retry time, or 0 if not needed
// * error - to be returned if we want to retry
func (m *Machine) drainNode(wrkldClstr workloadcluster.WorkloadCluster) (time.Duration, error) {
kubeClient, err := wrkldClstr.GenerateWorkloadClusterK8sClient(m.machineContext)
if err != nil {
m.machineContext.Logger.Error(err, "Error creating a remote client while deleting Machine, won't retry")
return 0, fmt.Errorf("failed to get client to remote cluster; %w", err)
}
nodeName := m.vmiInstance.Status.EvacuationNodeName
node, err := kubeClient.CoreV1().Nodes().Get(m.machineContext, nodeName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// If an admin deletes the node directly, we'll end up here.
m.machineContext.Logger.Error(err, "Could not find node from noderef, it may have already been deleted")
return 0, nil
}
return 0, fmt.Errorf("unable to get node %q: %w", nodeName, err)
}
drainer := &kubedrain.Helper{
Client: kubeClient,
Ctx: m.machineContext,
Force: true,
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: true,
GracePeriodSeconds: -1,
// If a pod is not evicted in 20 seconds, retry the eviction next time the
// machine gets reconciled again (to allow other machines to be reconciled).
Timeout: 20 * time.Second,
OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
verbStr := "Deleted"
if usingEviction {
verbStr = "Evicted"
}
m.machineContext.Logger.Info(fmt.Sprintf("%s pod from Node", verbStr),
"pod", fmt.Sprintf("%s/%s", pod.Name, pod.Namespace))
},
Out: writer{m.machineContext.Logger.Info},
ErrOut: writer{func(msg string, keysAndValues ...interface{}) {
m.machineContext.Logger.Error(nil, msg, keysAndValues...)
}},
}
if noderefutil.IsNodeUnreachable(node) {
// When the node is unreachable and some pods are not evicted for as long as this timeout, we ignore them.
drainer.SkipWaitForDeleteTimeoutSeconds = 60 * 5 // 5 minutes
}
if err = kubedrain.RunCordonOrUncordon(drainer, node, true); err != nil {
// Machine will be re-reconciled after a cordon failure.
m.machineContext.Logger.Error(err, "Cordon failed")
return 0, errors.Errorf("unable to cordon node %s: %v", nodeName, err)
}
if err = kubedrain.RunNodeDrain(drainer, node.Name); err != nil {
// Machine will be re-reconciled after a drain failure.
m.machineContext.Logger.Error(err, "Drain failed, retry in a second", "node name", nodeName)
return time.Second, nil
}
m.machineContext.Logger.Info("Drain successful", "node name", nodeName)
return 0, nil
}
// writer implements io.Writer interface as a pass-through for klog.
type writer struct {
logFunc func(msg string, keysAndValues ...interface{})
}
// Write passes string(p) into writer's logFunc and always returns len(p).
func (w writer) Write(p []byte) (n int, err error) {
w.logFunc(string(p))
return len(p), nil
}