Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
444 lines (377 sloc) 14.5 KB
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package cache implements data structures used by the attach/detach controller
to keep track of volumes, the nodes they are attached to, and the pods that
reference them.
*/
package cache
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
)
// DesiredStateOfWorld defines a set of thread-safe operations supported on
// the attach/detach controller's desired state of the world cache.
// This cache contains nodes->volumes->pods where nodes are all the nodes
// managed by the attach/detach controller, volumes are all the volumes that
// should be attached to the specified node, and pods are the pods that
// reference the volume and are scheduled to that node.
// Note: This is distinct from the DesiredStateOfWorld implemented by the
// kubelet volume manager. They both keep track of different objects. This
// contains attach/detach controller specific state.
type DesiredStateOfWorld interface {
// AddNode adds the given node to the list of nodes managed by the attach/
// detach controller.
// If the node already exists this is a no-op.
// keepTerminatedPodVolumes is a property of the node that determines
// if volumes should be mounted and attached for terminated pods.
AddNode(nodeName k8stypes.NodeName, keepTerminatedPodVolumes bool)
// AddPod adds the given pod to the list of pods that reference the
// specified volume and is scheduled to the specified node.
// A unique volumeName is generated from the volumeSpec and returned on
// success.
// If the pod already exists under the specified volume, this is a no-op.
// If volumeSpec is not an attachable volume plugin, an error is returned.
// If no volume with the name volumeName exists in the list of volumes that
// should be attached to the specified node, the volume is implicitly added.
// If no node with the name nodeName exists in list of nodes managed by the
// attach/detach attached controller, an error is returned.
AddPod(podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, nodeName k8stypes.NodeName) (v1.UniqueVolumeName, error)
// DeleteNode removes the given node from the list of nodes managed by the
// attach/detach controller.
// If the node does not exist this is a no-op.
// If the node exists but has 1 or more child volumes, an error is returned.
DeleteNode(nodeName k8stypes.NodeName) error
// DeletePod removes the given pod from the list of pods that reference the
// specified volume and are scheduled to the specified node.
// If no pod exists in the list of pods that reference the specified volume
// and are scheduled to the specified node, this is a no-op.
// If a node with the name nodeName does not exist in the list of nodes
// managed by the attach/detach attached controller, this is a no-op.
// If no volume with the name volumeName exists in the list of managed
// volumes under the specified node, this is a no-op.
// If after deleting the pod, the specified volume contains no other child
// pods, the volume is also deleted.
DeletePod(podName types.UniquePodName, volumeName v1.UniqueVolumeName, nodeName k8stypes.NodeName)
// NodeExists returns true if the node with the specified name exists in
// the list of nodes managed by the attach/detach controller.
NodeExists(nodeName k8stypes.NodeName) bool
// VolumeExists returns true if the volume with the specified name exists
// in the list of volumes that should be attached to the specified node by
// the attach detach controller.
VolumeExists(volumeName v1.UniqueVolumeName, nodeName k8stypes.NodeName) bool
// GetVolumesToAttach generates and returns a list of volumes to attach
// and the nodes they should be attached to based on the current desired
// state of the world.
GetVolumesToAttach() []VolumeToAttach
// GetPodToAdd generates and returns a map of pods based on the current desired
// state of world
GetPodToAdd() map[types.UniquePodName]PodToAdd
// GetKeepTerminatedPodVolumesForNode determines if node wants volumes to be
// mounted and attached for terminated pods
GetKeepTerminatedPodVolumesForNode(k8stypes.NodeName) bool
// Mark multi-attach error as reported to prevent spamming multiple
// events for same error
SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName)
// GetPodsOnNodes returns list of pods ("namespace/name") that require
// given volume on given nodes.
GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod
}
// VolumeToAttach represents a volume that should be attached to a node.
type VolumeToAttach struct {
operationexecutor.VolumeToAttach
}
// PodToAdd represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type PodToAdd struct {
// pod contains the api object of pod
Pod *v1.Pod
// volumeName contains the unique identifier for this volume.
VolumeName v1.UniqueVolumeName
// nodeName contains the name of this node.
NodeName k8stypes.NodeName
}
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
return &desiredStateOfWorld{
nodesManaged: make(map[k8stypes.NodeName]nodeManaged),
volumePluginMgr: volumePluginMgr,
}
}
type desiredStateOfWorld struct {
// nodesManaged is a map containing the set of nodes managed by the attach/
// detach controller. The key in this map is the name of the node and the
// value is a node object containing more information about the node.
nodesManaged map[k8stypes.NodeName]nodeManaged
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
// nodeManaged represents a node that is being managed by the attach/detach
// controller.
type nodeManaged struct {
// nodeName contains the name of this node.
nodeName k8stypes.NodeName
// volumesToAttach is a map containing the set of volumes that should be
// attached to this node. The key in the map is the name of the volume and
// the value is a volumeToAttach object containing more information about the volume.
volumesToAttach map[v1.UniqueVolumeName]volumeToAttach
// keepTerminatedPodVolumes determines if for terminated pods(on this node) - volumes
// should be kept mounted and attached.
keepTerminatedPodVolumes bool
}
// The volumeToAttach object represents a volume that should be attached to a node.
type volumeToAttach struct {
// multiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume.
// It is used to prevent reporting the error from being reported more than once for a given volume.
multiAttachErrorReported bool
// volumeName contains the unique identifier for this volume.
volumeName v1.UniqueVolumeName
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to attach/detach
// methods.
spec *volume.Spec
// scheduledPods is a map containing the set of pods that reference this
// volume and are scheduled to the underlying node. The key in the map is
// the name of the pod and the value is a pod object containing more
// information about the pod.
scheduledPods map[types.UniquePodName]pod
}
// The pod represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type pod struct {
// podName contains the unique identifier for this pod
podName types.UniquePodName
// pod object contains the api object of pod
podObj *v1.Pod
}
func (dsw *desiredStateOfWorld) AddNode(nodeName k8stypes.NodeName, keepTerminatedPodVolumes bool) {
dsw.Lock()
defer dsw.Unlock()
if _, nodeExists := dsw.nodesManaged[nodeName]; !nodeExists {
dsw.nodesManaged[nodeName] = nodeManaged{
nodeName: nodeName,
volumesToAttach: make(map[v1.UniqueVolumeName]volumeToAttach),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
}
}
}
func (dsw *desiredStateOfWorld) AddPod(
podName types.UniquePodName,
podToAdd *v1.Pod,
volumeSpec *volume.Spec,
nodeName k8stypes.NodeName) (v1.UniqueVolumeName, error) {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if !nodeExists {
return "", fmt.Errorf(
"no node with the name %q exists in the list of managed nodes",
nodeName)
}
attachableVolumePlugin, err := dsw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
if attachableVolumePlugin == nil {
err = fmt.Errorf("plugin do not support attachment")
}
return "", fmt.Errorf(
"failed to get AttachablePlugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
volumeName, err := util.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to get UniqueVolumeName from volumeSpec for plugin=%q and volume=%q err=%v",
attachableVolumePlugin.GetPluginName(),
volumeSpec.Name(),
err)
}
volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]
if !volumeExists {
volumeObj = volumeToAttach{
multiAttachErrorReported: false,
volumeName: volumeName,
spec: volumeSpec,
scheduledPods: make(map[types.UniquePodName]pod),
}
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
}
if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] =
pod{
podName: podName,
podObj: podToAdd,
}
}
return volumeName, nil
}
func (dsw *desiredStateOfWorld) DeleteNode(nodeName k8stypes.NodeName) error {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if !nodeExists {
return nil
}
if len(nodeObj.volumesToAttach) > 0 {
return fmt.Errorf(
"failed to delete node %q from list of nodes managed by attach/detach controller--the node still contains %v volumes in its list of volumes to attach",
nodeName,
len(nodeObj.volumesToAttach))
}
delete(
dsw.nodesManaged,
nodeName)
return nil
}
func (dsw *desiredStateOfWorld) DeletePod(
podName types.UniquePodName,
volumeName v1.UniqueVolumeName,
nodeName k8stypes.NodeName) {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if !nodeExists {
return
}
volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]
if !volumeExists {
return
}
if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
return
}
delete(
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods,
podName)
if len(volumeObj.scheduledPods) == 0 {
delete(
dsw.nodesManaged[nodeName].volumesToAttach,
volumeName)
}
}
func (dsw *desiredStateOfWorld) NodeExists(nodeName k8stypes.NodeName) bool {
dsw.RLock()
defer dsw.RUnlock()
_, nodeExists := dsw.nodesManaged[nodeName]
return nodeExists
}
func (dsw *desiredStateOfWorld) VolumeExists(
volumeName v1.UniqueVolumeName, nodeName k8stypes.NodeName) bool {
dsw.RLock()
defer dsw.RUnlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if nodeExists {
if _, volumeExists := nodeObj.volumesToAttach[volumeName]; volumeExists {
return true
}
}
return false
}
func (dsw *desiredStateOfWorld) SetMultiAttachError(
volumeName v1.UniqueVolumeName,
nodeName k8stypes.NodeName) {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if nodeExists {
if volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]; volumeExists {
volumeObj.multiAttachErrorReported = true
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
}
}
}
// GetKeepTerminatedPodVolumesForNode determines if node wants volumes to be
// mounted and attached for terminated pods
func (dsw *desiredStateOfWorld) GetKeepTerminatedPodVolumesForNode(nodeName k8stypes.NodeName) bool {
dsw.RLock()
defer dsw.RUnlock()
if nodeName == "" {
return false
}
if node, ok := dsw.nodesManaged[nodeName]; ok {
return node.keepTerminatedPodVolumes
}
return false
}
func (dsw *desiredStateOfWorld) GetVolumesToAttach() []VolumeToAttach {
dsw.RLock()
defer dsw.RUnlock()
volumesToAttach := make([]VolumeToAttach, 0 /* len */, len(dsw.nodesManaged) /* cap */)
for nodeName, nodeObj := range dsw.nodesManaged {
for volumeName, volumeObj := range nodeObj.volumesToAttach {
volumesToAttach = append(volumesToAttach,
VolumeToAttach{
VolumeToAttach: operationexecutor.VolumeToAttach{
MultiAttachErrorReported: volumeObj.multiAttachErrorReported,
VolumeName: volumeName,
VolumeSpec: volumeObj.spec,
NodeName: nodeName,
ScheduledPods: getPodsFromMap(volumeObj.scheduledPods),
}})
}
}
return volumesToAttach
}
// Construct a list of v1.Pod objects from the given pod map
func getPodsFromMap(podMap map[types.UniquePodName]pod) []*v1.Pod {
pods := make([]*v1.Pod, 0, len(podMap))
for _, pod := range podMap {
pods = append(pods, pod.podObj)
}
return pods
}
func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
dsw.RLock()
defer dsw.RUnlock()
pods := make(map[types.UniquePodName]PodToAdd)
for nodeName, nodeObj := range dsw.nodesManaged {
for volumeName, volumeObj := range nodeObj.volumesToAttach {
for podUID, pod := range volumeObj.scheduledPods {
pods[podUID] = PodToAdd{
Pod: pod.podObj,
VolumeName: volumeName,
NodeName: nodeName,
}
}
}
}
return pods
}
func (dsw *desiredStateOfWorld) GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod {
dsw.RLock()
defer dsw.RUnlock()
pods := []*v1.Pod{}
for _, nodeName := range nodes {
node, ok := dsw.nodesManaged[nodeName]
if !ok {
continue
}
volume, ok := node.volumesToAttach[volumeName]
if !ok {
continue
}
for _, pod := range volume.scheduledPods {
pods = append(pods, pod.podObj)
}
}
return pods
}
You can’t perform that action at this time.