-
Notifications
You must be signed in to change notification settings - Fork 38.7k
/
util.go
816 lines (731 loc) · 29.7 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"fmt"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"time"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
utypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
storagehelpers "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
"k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
"k8s.io/utils/io"
utilstrings "k8s.io/utils/strings"
)
const (
readyFileName = "ready"
// ControllerManagedAttachAnnotation is the key of the annotation on Node
// objects that indicates attach/detach operations for the node should be
// managed by the attach/detach controller
ControllerManagedAttachAnnotation string = "volumes.kubernetes.io/controller-managed-attach-detach"
// KeepTerminatedPodVolumesAnnotation is the key of the annotation on Node
// that decides if pod volumes are unmounted when pod is terminated
KeepTerminatedPodVolumesAnnotation string = "volumes.kubernetes.io/keep-terminated-pod-volumes"
// MountsInGlobalPDPath is name of the directory appended to a volume plugin
// name to create the place for volume mounts in the global PD path.
MountsInGlobalPDPath = "mounts"
// VolumeGidAnnotationKey is the of the annotation on the PersistentVolume
// object that specifies a supplemental GID.
VolumeGidAnnotationKey = "pv.beta.kubernetes.io/gid"
// VolumeDynamicallyCreatedByKey is the key of the annotation on PersistentVolume
// object created dynamically
VolumeDynamicallyCreatedByKey = "kubernetes.io/createdby"
// kubernetesPluginPathPrefix is the prefix of kubernetes plugin mount paths.
kubernetesPluginPathPrefix = "/plugins/kubernetes.io/"
)
// IsReady checks for the existence of a regular file
// called 'ready' in the given directory and returns
// true if that file exists.
func IsReady(dir string) bool {
readyFile := filepath.Join(dir, readyFileName)
s, err := os.Stat(readyFile)
if err != nil {
return false
}
if !s.Mode().IsRegular() {
klog.Errorf("ready-file is not a file: %s", readyFile)
return false
}
return true
}
// SetReady creates a file called 'ready' in the given
// directory. It logs an error if the file cannot be
// created.
func SetReady(dir string) {
if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) {
klog.Errorf("Can't mkdir %s: %v", dir, err)
return
}
readyFile := filepath.Join(dir, readyFileName)
file, err := os.Create(readyFile)
if err != nil {
klog.Errorf("Can't touch %s: %v", readyFile, err)
return
}
file.Close()
}
// GetSecretForPod locates secret by name in the pod's namespace and returns secret map
func GetSecretForPod(pod *v1.Pod, secretName string, kubeClient clientset.Interface) (map[string]string, error) {
secret := make(map[string]string)
if kubeClient == nil {
return secret, fmt.Errorf("cannot get kube client")
}
secrets, err := kubeClient.CoreV1().Secrets(pod.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
if err != nil {
return secret, err
}
for name, data := range secrets.Data {
secret[name] = string(data)
}
return secret, nil
}
// GetSecretForPV locates secret by name and namespace, verifies the secret type, and returns secret map
func GetSecretForPV(secretNamespace, secretName, volumePluginName string, kubeClient clientset.Interface) (map[string]string, error) {
secret := make(map[string]string)
if kubeClient == nil {
return secret, fmt.Errorf("cannot get kube client")
}
secrets, err := kubeClient.CoreV1().Secrets(secretNamespace).Get(context.TODO(), secretName, metav1.GetOptions{})
if err != nil {
return secret, err
}
if secrets.Type != v1.SecretType(volumePluginName) {
return secret, fmt.Errorf("cannot get secret of type %s", volumePluginName)
}
for name, data := range secrets.Data {
secret[name] = string(data)
}
return secret, nil
}
// GetClassForVolume locates storage class by persistent volume
func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume) (*storage.StorageClass, error) {
if kubeClient == nil {
return nil, fmt.Errorf("cannot get kube client")
}
className := storagehelpers.GetPersistentVolumeClass(pv)
if className == "" {
return nil, fmt.Errorf("volume has no storage class")
}
class, err := kubeClient.StorageV1().StorageClasses().Get(context.TODO(), className, metav1.GetOptions{})
if err != nil {
return nil, err
}
return class, nil
}
// LoadPodFromFile will read, decode, and return a Pod from a file.
func LoadPodFromFile(filePath string) (*v1.Pod, error) {
if filePath == "" {
return nil, fmt.Errorf("file path not specified")
}
podDef, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file path %s: %+v", filePath, err)
}
if len(podDef) == 0 {
return nil, fmt.Errorf("file was empty: %s", filePath)
}
pod := &v1.Pod{}
codec := legacyscheme.Codecs.UniversalDecoder()
if err := apiruntime.DecodeInto(codec, podDef, pod); err != nil {
return nil, fmt.Errorf("failed decoding file: %v", err)
}
return pod, nil
}
// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a
// recycle operation. The calculation and return value is either the
// minimumTimeout or the timeoutIncrement per Gi of storage size, whichever is
// greater.
func CalculateTimeoutForVolume(minimumTimeout, timeoutIncrement int, pv *v1.PersistentVolume) int64 {
giQty := resource.MustParse("1Gi")
pvQty := pv.Spec.Capacity[v1.ResourceStorage]
giSize := giQty.Value()
pvSize := pvQty.Value()
timeout := (pvSize / giSize) * int64(timeoutIncrement)
if timeout < int64(minimumTimeout) {
return int64(minimumTimeout)
}
return timeout
}
// GenerateVolumeName returns a PV name with clusterName prefix. The function
// should be used to generate a name of GCE PD or Cinder volume. It basically
// adds "<clusterName>-dynamic-" before the PV name, making sure the resulting
// string fits given length and cuts "dynamic" if not.
func GenerateVolumeName(clusterName, pvName string, maxLength int) string {
prefix := clusterName + "-dynamic"
pvLen := len(pvName)
// cut the "<clusterName>-dynamic" to fit full pvName into maxLength
// +1 for the '-' dash
if pvLen+1+len(prefix) > maxLength {
prefix = prefix[:maxLength-pvLen-1]
}
return prefix + "-" + pvName
}
// GetPath checks if the path from the mounter is empty.
func GetPath(mounter volume.Mounter) (string, error) {
path := mounter.GetPath()
if path == "" {
return "", fmt.Errorf("path is empty %s", reflect.TypeOf(mounter).String())
}
return path, nil
}
// UnmountViaEmptyDir delegates the tear down operation for secret, configmap, git_repo and downwardapi
// to empty_dir
func UnmountViaEmptyDir(dir string, host volume.VolumeHost, volName string, volSpec volume.Spec, podUID utypes.UID) error {
klog.V(3).Infof("Tearing down volume %v for pod %v at %v", volName, podUID, dir)
// Wrap EmptyDir, let it do the teardown.
wrapped, err := host.NewWrapperUnmounter(volName, volSpec, podUID)
if err != nil {
return err
}
return wrapped.TearDownAt(dir)
}
// MountOptionFromSpec extracts and joins mount options from volume spec with supplied options
func MountOptionFromSpec(spec *volume.Spec, options ...string) []string {
pv := spec.PersistentVolume
if pv != nil {
// Use beta annotation first
if mo, ok := pv.Annotations[v1.MountOptionAnnotation]; ok {
moList := strings.Split(mo, ",")
return JoinMountOptions(moList, options)
}
if len(pv.Spec.MountOptions) > 0 {
return JoinMountOptions(pv.Spec.MountOptions, options)
}
}
return options
}
// JoinMountOptions joins mount options eliminating duplicates
func JoinMountOptions(userOptions []string, systemOptions []string) []string {
allMountOptions := sets.NewString()
for _, mountOption := range userOptions {
if len(mountOption) > 0 {
allMountOptions.Insert(mountOption)
}
}
for _, mountOption := range systemOptions {
allMountOptions.Insert(mountOption)
}
return allMountOptions.List()
}
// ContainsAccessMode returns whether the requested mode is contained by modes
func ContainsAccessMode(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool {
for _, m := range modes {
if m == mode {
return true
}
}
return false
}
// ContainsAllAccessModes returns whether all of the requested modes are contained by modes
func ContainsAllAccessModes(indexedModes []v1.PersistentVolumeAccessMode, requestedModes []v1.PersistentVolumeAccessMode) bool {
for _, mode := range requestedModes {
if !ContainsAccessMode(indexedModes, mode) {
return false
}
}
return true
}
// GetWindowsPath get a windows path
func GetWindowsPath(path string) string {
windowsPath := strings.Replace(path, "/", "\\", -1)
if strings.HasPrefix(windowsPath, "\\") {
windowsPath = "c:" + windowsPath
}
return windowsPath
}
// GetUniquePodName returns a unique identifier to reference a pod by
func GetUniquePodName(pod *v1.Pod) types.UniquePodName {
return types.UniquePodName(pod.UID)
}
// GetUniqueVolumeName returns a unique name representing the volume/plugin.
// Caller should ensure that volumeName is a name/ID uniquely identifying the
// actual backing device, directory, path, etc. for a particular volume.
// The returned name can be used to uniquely reference the volume, for example,
// to prevent operations (attach/detach or mount/unmount) from being triggered
// on the same volume.
func GetUniqueVolumeName(pluginName, volumeName string) v1.UniqueVolumeName {
return v1.UniqueVolumeName(fmt.Sprintf("%s/%s", pluginName, volumeName))
}
// GetUniqueVolumeNameFromSpecWithPod returns a unique volume name with pod
// name included. This is useful to generate different names for different pods
// on same volume.
func GetUniqueVolumeNameFromSpecWithPod(
podName types.UniquePodName, volumePlugin volume.VolumePlugin, volumeSpec *volume.Spec) v1.UniqueVolumeName {
return v1.UniqueVolumeName(
fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, volumeSpec.Name()))
}
// GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique
// name representing the volume defined in the specified volume spec.
// This returned name can be used to uniquely reference the actual backing
// device, directory, path, etc. referenced by the given volumeSpec.
// If the given plugin does not support the volume spec, this returns an error.
func GetUniqueVolumeNameFromSpec(
volumePlugin volume.VolumePlugin,
volumeSpec *volume.Spec) (v1.UniqueVolumeName, error) {
if volumePlugin == nil {
return "", fmt.Errorf(
"volumePlugin should not be nil. volumeSpec.Name=%q",
volumeSpec.Name())
}
volumeName, err := volumePlugin.GetVolumeName(volumeSpec)
if err != nil || volumeName == "" {
return "", fmt.Errorf(
"failed to GetVolumeName from volumePlugin for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
return GetUniqueVolumeName(
volumePlugin.GetPluginName(),
volumeName),
nil
}
// IsPodTerminated checks if pod is terminated
func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {
// TODO: the guarantees provided by kubelet status are not sufficient to guarantee it's safe to ignore a deleted pod,
// even if everything is notRunning (kubelet does not guarantee that when pod status is waiting that it isn't trying
// to start a container).
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.InitContainerStatuses) && notRunning(podStatus.ContainerStatuses) && notRunning(podStatus.EphemeralContainerStatuses))
}
// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []v1.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}
// SplitUniqueName splits the unique name to plugin name and volume name strings. It expects the uniqueName to follow
// the format plugin_name/volume_name and the plugin name must be namespaced as described by the plugin interface,
// i.e. namespace/plugin containing exactly one '/'. This means the unique name will always be in the form of
// plugin_namespace/plugin/volume_name, see k8s.io/kubernetes/pkg/volume/plugins.go VolumePlugin interface
// description and pkg/volume/util/volumehelper/volumehelper.go GetUniqueVolumeNameFromSpec that constructs
// the unique volume names.
func SplitUniqueName(uniqueName v1.UniqueVolumeName) (string, string, error) {
components := strings.SplitN(string(uniqueName), "/", 3)
if len(components) != 3 {
return "", "", fmt.Errorf("cannot split volume unique name %s to plugin/volume components", uniqueName)
}
pluginName := fmt.Sprintf("%s/%s", components[0], components[1])
return pluginName, components[2], nil
}
// NewSafeFormatAndMountFromHost creates a new SafeFormatAndMount with Mounter
// and Exec taken from given VolumeHost.
func NewSafeFormatAndMountFromHost(pluginName string, host volume.VolumeHost) *mount.SafeFormatAndMount {
mounter := host.GetMounter(pluginName)
exec := host.GetExec(pluginName)
return &mount.SafeFormatAndMount{Interface: mounter, Exec: exec}
}
// GetVolumeMode retrieves VolumeMode from pv.
// If the volume doesn't have PersistentVolume, it's an inline volume,
// should return volumeMode as filesystem to keep existing behavior.
func GetVolumeMode(volumeSpec *volume.Spec) (v1.PersistentVolumeMode, error) {
if volumeSpec == nil || volumeSpec.PersistentVolume == nil {
return v1.PersistentVolumeFilesystem, nil
}
if volumeSpec.PersistentVolume.Spec.VolumeMode != nil {
return *volumeSpec.PersistentVolume.Spec.VolumeMode, nil
}
return "", fmt.Errorf("cannot get volumeMode for volume: %v", volumeSpec.Name())
}
// GetPersistentVolumeClaimQualifiedName returns a qualified name for pvc.
func GetPersistentVolumeClaimQualifiedName(claim *v1.PersistentVolumeClaim) string {
return utilstrings.JoinQualifiedName(claim.GetNamespace(), claim.GetName())
}
// CheckVolumeModeFilesystem checks VolumeMode.
// If the mode is Filesystem, return true otherwise return false.
func CheckVolumeModeFilesystem(volumeSpec *volume.Spec) (bool, error) {
volumeMode, err := GetVolumeMode(volumeSpec)
if err != nil {
return true, err
}
if volumeMode == v1.PersistentVolumeBlock {
return false, nil
}
return true, nil
}
// CheckPersistentVolumeClaimModeBlock checks VolumeMode.
// If the mode is Block, return true otherwise return false.
func CheckPersistentVolumeClaimModeBlock(pvc *v1.PersistentVolumeClaim) bool {
return pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock
}
// IsWindowsUNCPath checks if path is prefixed with \\
// This can be used to skip any processing of paths
// that point to SMB shares, local named pipes and local UNC path
func IsWindowsUNCPath(goos, path string) bool {
if goos != "windows" {
return false
}
// Check for UNC prefix \\
if strings.HasPrefix(path, `\\`) {
return true
}
return false
}
// IsWindowsLocalPath checks if path is a local path
// prefixed with "/" or "\" like "/foo/bar" or "\foo\bar"
func IsWindowsLocalPath(goos, path string) bool {
if goos != "windows" {
return false
}
if IsWindowsUNCPath(goos, path) {
return false
}
if strings.Contains(path, ":") {
return false
}
if !(strings.HasPrefix(path, `/`) || strings.HasPrefix(path, `\`)) {
return false
}
return true
}
// MakeAbsolutePath convert path to absolute path according to GOOS
func MakeAbsolutePath(goos, path string) string {
if goos != "windows" {
return filepath.Clean("/" + path)
}
// These are all for windows
// If there is a colon, give up.
if strings.Contains(path, ":") {
return path
}
// If there is a slash, but no drive, add 'c:'
if strings.HasPrefix(path, "/") || strings.HasPrefix(path, "\\") {
return "c:" + path
}
// Otherwise, add 'c:\'
return "c:\\" + path
}
// MapBlockVolume is a utility function to provide a common way of mapping
// block device path for a specified volume and pod. This function should be
// called by volume plugins that implements volume.BlockVolumeMapper.Map() method.
func MapBlockVolume(
blkUtil volumepathhandler.BlockVolumePathHandler,
devicePath,
globalMapPath,
podVolumeMapPath,
volumeMapName string,
podUID utypes.UID,
) error {
// map devicePath to global node path as bind mount
mapErr := blkUtil.MapDevice(devicePath, globalMapPath, string(podUID), true /* bindMount */)
if mapErr != nil {
return fmt.Errorf("blkUtil.MapDevice failed. devicePath: %s, globalMapPath:%s, podUID: %s, bindMount: %v: %v",
devicePath, globalMapPath, string(podUID), true, mapErr)
}
// map devicePath to pod volume path
mapErr = blkUtil.MapDevice(devicePath, podVolumeMapPath, volumeMapName, false /* bindMount */)
if mapErr != nil {
return fmt.Errorf("blkUtil.MapDevice failed. devicePath: %s, podVolumeMapPath:%s, volumeMapName: %s, bindMount: %v: %v",
devicePath, podVolumeMapPath, volumeMapName, false, mapErr)
}
// Take file descriptor lock to keep a block device opened. Otherwise, there is a case
// that the block device is silently removed and attached another device with the same name.
// Container runtime can't handle this problem. To avoid unexpected condition fd lock
// for the block device is required.
_, mapErr = blkUtil.AttachFileDevice(filepath.Join(globalMapPath, string(podUID)))
if mapErr != nil {
return fmt.Errorf("blkUtil.AttachFileDevice failed. globalMapPath:%s, podUID: %s: %v",
globalMapPath, string(podUID), mapErr)
}
return nil
}
// UnmapBlockVolume is a utility function to provide a common way of unmapping
// block device path for a specified volume and pod. This function should be
// called by volume plugins that implements volume.BlockVolumeMapper.Map() method.
func UnmapBlockVolume(
blkUtil volumepathhandler.BlockVolumePathHandler,
globalUnmapPath,
podDeviceUnmapPath,
volumeMapName string,
podUID utypes.UID,
) error {
// Release file descriptor lock.
err := blkUtil.DetachFileDevice(filepath.Join(globalUnmapPath, string(podUID)))
if err != nil {
return fmt.Errorf("blkUtil.DetachFileDevice failed. globalUnmapPath:%s, podUID: %s: %v",
globalUnmapPath, string(podUID), err)
}
// unmap devicePath from pod volume path
unmapDeviceErr := blkUtil.UnmapDevice(podDeviceUnmapPath, volumeMapName, false /* bindMount */)
if unmapDeviceErr != nil {
return fmt.Errorf("blkUtil.DetachFileDevice failed. podDeviceUnmapPath:%s, volumeMapName: %s, bindMount: %v: %v",
podDeviceUnmapPath, volumeMapName, false, unmapDeviceErr)
}
// unmap devicePath from global node path
unmapDeviceErr = blkUtil.UnmapDevice(globalUnmapPath, string(podUID), true /* bindMount */)
if unmapDeviceErr != nil {
return fmt.Errorf("blkUtil.DetachFileDevice failed. globalUnmapPath:%s, podUID: %s, bindMount: %v: %v",
globalUnmapPath, string(podUID), true, unmapDeviceErr)
}
return nil
}
// GetPluginMountDir returns the global mount directory name appended
// to the given plugin name's plugin directory
func GetPluginMountDir(host volume.VolumeHost, name string) string {
mntDir := filepath.Join(host.GetPluginDir(name), MountsInGlobalPDPath)
return mntDir
}
// IsLocalEphemeralVolume determines whether the argument is a local ephemeral
// volume vs. some other type
// Local means the volume is using storage from the local disk that is managed by kubelet.
// Ephemeral means the lifecycle of the volume is the same as the Pod.
func IsLocalEphemeralVolume(volume v1.Volume) bool {
return volume.GitRepo != nil ||
(volume.EmptyDir != nil && volume.EmptyDir.Medium == v1.StorageMediumDefault) ||
volume.ConfigMap != nil
}
// GetLocalPersistentVolumeNodeNames returns the node affinity node name(s) for
// local PersistentVolumes. nil is returned if the PV does not have any
// specific node affinity node selector terms and match expressions.
// PersistentVolume with node affinity has select and match expressions
// in the form of:
//
// nodeAffinity:
// required:
// nodeSelectorTerms:
// - matchExpressions:
// - key: kubernetes.io/hostname
// operator: In
// values:
// - <node1>
// - <node2>
func GetLocalPersistentVolumeNodeNames(pv *v1.PersistentVolume) []string {
if pv == nil || pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
return nil
}
var result sets.Set[string]
for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
var nodes sets.Set[string]
for _, matchExpr := range term.MatchExpressions {
if matchExpr.Key == v1.LabelHostname && matchExpr.Operator == v1.NodeSelectorOpIn {
if nodes == nil {
nodes = sets.New(matchExpr.Values...)
} else {
nodes = nodes.Intersection(sets.New(matchExpr.Values...))
}
}
}
result = result.Union(nodes)
}
return sets.List(result)
}
// GetPodVolumeNames returns names of volumes that are used in a pod,
// either as filesystem mount or raw block device, together with list
// of all SELinux contexts of all containers that use the volumes.
func GetPodVolumeNames(pod *v1.Pod) (mounts sets.String, devices sets.String, seLinuxContainerContexts map[string][]*v1.SELinuxOptions) {
mounts = sets.NewString()
devices = sets.NewString()
seLinuxContainerContexts = make(map[string][]*v1.SELinuxOptions)
podutil.VisitContainers(&pod.Spec, podutil.AllFeatureEnabledContainers(), func(container *v1.Container, containerType podutil.ContainerType) bool {
var seLinuxOptions *v1.SELinuxOptions
if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
effectiveContainerSecurity := securitycontext.DetermineEffectiveSecurityContext(pod, container)
if effectiveContainerSecurity != nil {
// No DeepCopy, SELinuxOptions is already a copy of Pod's or container's SELinuxOptions
seLinuxOptions = effectiveContainerSecurity.SELinuxOptions
}
}
if container.VolumeMounts != nil {
for _, mount := range container.VolumeMounts {
mounts.Insert(mount.Name)
if seLinuxOptions != nil {
seLinuxContainerContexts[mount.Name] = append(seLinuxContainerContexts[mount.Name], seLinuxOptions.DeepCopy())
}
}
}
if container.VolumeDevices != nil {
for _, device := range container.VolumeDevices {
devices.Insert(device.Name)
}
}
return true
})
return
}
// FsUserFrom returns FsUser of pod, which is determined by the runAsUser
// attributes.
func FsUserFrom(pod *v1.Pod) *int64 {
var fsUser *int64
// Exclude ephemeral containers because SecurityContext is not allowed.
podutil.VisitContainers(&pod.Spec, podutil.InitContainers|podutil.Containers, func(container *v1.Container, containerType podutil.ContainerType) bool {
runAsUser, ok := securitycontext.DetermineEffectiveRunAsUser(pod, container)
// One container doesn't specify user or there are more than one
// non-root UIDs.
if !ok || (fsUser != nil && *fsUser != *runAsUser) {
fsUser = nil
return false
}
if fsUser == nil {
fsUser = runAsUser
}
return true
})
return fsUser
}
// HasMountRefs checks if the given mountPath has mountRefs.
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
// script will cause additional mounts created in the container. Since these mounts are
// irrelevant to the original mounts, they should be not considered when checking the
// mount references. The current solution is to filter out those mount paths that contain
// the k8s plugin suffix of original mount path.
func HasMountRefs(mountPath string, mountRefs []string) bool {
// A mountPath typically is like
// /var/lib/kubelet/plugins/kubernetes.io/some-plugin/mounts/volume-XXXX
// Mount refs can look like
// /home/somewhere/var/lib/kubelet/plugins/kubernetes.io/some-plugin/...
// but if /var/lib/kubelet is mounted to a different device a ref might be like
// /mnt/some-other-place/kubelet/plugins/kubernetes.io/some-plugin/...
// Neither of the above should be counted as a mount ref as those are handled
// by the kubelet. What we're concerned about is a path like
// /data/local/some/manual/mount
// As unmounting could interrupt usage from that mountpoint.
//
// So instead of looking for the entire /var/lib/... path, the plugins/kubernetes.io/
// suffix is trimmed off and searched for.
//
// If there isn't a /plugins/... path, the whole mountPath is used instead.
pathToFind := mountPath
if i := strings.Index(mountPath, kubernetesPluginPathPrefix); i > -1 {
pathToFind = mountPath[i:]
}
for _, ref := range mountRefs {
if !strings.Contains(ref, pathToFind) {
return true
}
}
return false
}
// WriteVolumeCache flush disk data given the specified mount path
func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error {
// If runtime os is windows, execute Write-VolumeCache powershell command on the disk
if runtime.GOOS == "windows" {
cmdString := "Get-Volume -FilePath $env:mountpath | Write-Volumecache"
cmd := exec.Command("powershell", "/c", cmdString)
env := append(os.Environ(), fmt.Sprintf("mountpath=%s", deviceMountPath))
cmd.SetEnv(env)
klog.V(8).Infof("Executing command: %q", cmdString)
output, err := cmd.CombinedOutput()
klog.Infof("command (%q) execeuted: %v, output: %q", cmdString, err, string(output))
if err != nil {
return fmt.Errorf("command (%q) failed: %v, output: %q", cmdString, err, string(output))
}
}
// For linux runtime, it skips because unmount will automatically flush disk data
return nil
}
// IsMultiAttachAllowed checks if attaching this volume to multiple nodes is definitely not allowed/possible.
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
// attacher to fail fast in such cases.
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
func IsMultiAttachAllowed(volumeSpec *volume.Spec) bool {
if volumeSpec == nil {
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
return true
}
if volumeSpec.Volume != nil {
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
if volumeSpec.Volume.AzureDisk != nil ||
volumeSpec.Volume.Cinder != nil {
return false
}
}
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
if volumeSpec.PersistentVolume != nil {
// Check for persistent volume types which do not fail when trying to multi-attach
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
// No access mode specified so we don't know for sure. Let the attacher fail if needed
return true
}
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
return true
}
}
return false
}
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
return true
}
// IsAttachableVolume checks if the given volumeSpec is an attachable volume or not
func IsAttachableVolume(volumeSpec *volume.Spec, volumePluginMgr *volume.VolumePluginMgr) bool {
attachableVolumePlugin, _ := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if attachableVolumePlugin != nil {
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
if err == nil && volumeAttacher != nil {
return true
}
}
return false
}
// IsDeviceMountableVolume checks if the given volumeSpec is an device mountable volume or not
func IsDeviceMountableVolume(volumeSpec *volume.Spec, volumePluginMgr *volume.VolumePluginMgr) bool {
deviceMountableVolumePlugin, _ := volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
if deviceMountableVolumePlugin != nil {
volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
if err == nil && volumeDeviceMounter != nil {
return true
}
}
return false
}
// GetReliableMountRefs calls mounter.GetMountRefs and retries on IsInconsistentReadError.
// To be used in volume reconstruction of volume plugins that don't have any protection
// against mounting a single volume on multiple nodes (such as attach/detach).
func GetReliableMountRefs(mounter mount.Interface, mountPath string) ([]string, error) {
var paths []string
var lastErr error
err := wait.PollImmediate(10*time.Millisecond, time.Minute, func() (bool, error) {
var err error
paths, err = mounter.GetMountRefs(mountPath)
if io.IsInconsistentReadError(err) {
lastErr = err
return false, nil
}
if err != nil {
return false, err
}
return true, nil
})
if err == wait.ErrWaitTimeout {
return nil, lastErr
}
return paths, err
}