Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement desiredWorld populator to sync up with informer #27576

Merged
merged 1 commit into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/controller/volume/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/populator"
"k8s.io/kubernetes/pkg/controller/volume/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/statusupdater"
"k8s.io/kubernetes/pkg/types"
Expand All @@ -50,6 +51,10 @@ const (
// from its node. Once this time has expired, the controller will assume the
// node or kubelet are unresponsive and will detach the volume anyway.
reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute

// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 5 * time.Minute
)

// AttachDetachController defines the operations supported by this controller.
Expand Down Expand Up @@ -119,6 +124,11 @@ func NewAttachDetachController(
adc.attacherDetacher,
adc.nodeStatusUpdater)

adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
desiredStateOfWorldPopulatorLoopSleepPeriod,
podInformer,
adc.desiredStateOfWorld)

return adc, nil
}

Expand Down Expand Up @@ -170,13 +180,18 @@ type attachDetachController struct {
// nodeStatusUpdater is used to update node status with the list of attached
// volumes
nodeStatusUpdater statusupdater.NodeStatusUpdater

// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the current pods using podInformer.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}

func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
glog.Infof("Starting Attach Detach Controller")

go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)

<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
Expand Down Expand Up @@ -300,7 +315,7 @@ func (adc *attachDetachController) processPodVolumes(
if addVolumes {
// Add volume to desired state of world
_, err := adc.desiredStateOfWorld.AddPod(
uniquePodName, volumeSpec, pod.Spec.NodeName)
uniquePodName, pod, volumeSpec, pod.Spec.NodeName)
if err != nil {
glog.V(10).Infof(
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
Expand Down
50 changes: 45 additions & 5 deletions pkg/controller/volume/cache/desired_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type DesiredStateOfWorld interface {
// should be attached to the specified node, the volume is implicitly added.
// If no node with the name nodeName exists in list of nodes managed by the
// attach/detach attached controller, an error is returned.
AddPod(podName types.UniquePodName, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error)
AddPod(podName types.UniquePodName, pod *api.Pod, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error)

// DeleteNode removes the given node from the list of nodes managed by the
// attach/detach controller.
Expand Down Expand Up @@ -90,13 +90,30 @@ type DesiredStateOfWorld interface {
// and the nodes they should be attached to based on the current desired
// state of the world.
GetVolumesToAttach() []VolumeToAttach

// GetPodToAdd generates and returns a map of pods based on the current desired
// state of world
GetPodToAdd() map[types.UniquePodName]PodToAdd
}

// VolumeToAttach represents a volume that should be attached to a node.
type VolumeToAttach struct {
operationexecutor.VolumeToAttach
}

// PodToAdd represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type PodToAdd struct {
// pod contains the api object of pod
Pod *api.Pod

// volumeName contains the unique identifier for this volume.
VolumeName api.UniqueVolumeName

// nodeName contains the name of this node.
NodeName string
}

// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
return &desiredStateOfWorld{
Expand All @@ -119,7 +136,7 @@ type desiredStateOfWorld struct {
// nodeManaged represents a node that is being managed by the attach/detach
// controller.
type nodeManaged struct {
// nodName contains the name of this node.
// nodeName contains the name of this node.
nodeName string

// volumesToAttach is a map containing the set of volumes that should be
Expand All @@ -145,11 +162,14 @@ type volumeToAttach struct {
scheduledPods map[types.UniquePodName]pod
}

// The pod object represents a pod that references the underlying volume and is
// The pod represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type pod struct {
// podName contains the name of this pod.
// podName contains the unique identifier for this pod
podName types.UniquePodName

// pod object contains the api object of pod
podObj *api.Pod
}

func (dsw *desiredStateOfWorld) AddNode(nodeName string) {
Expand All @@ -166,6 +186,7 @@ func (dsw *desiredStateOfWorld) AddNode(nodeName string) {

func (dsw *desiredStateOfWorld) AddPod(
podName types.UniquePodName,
podToAdd *api.Pod,
volumeSpec *volume.Spec,
nodeName string) (api.UniqueVolumeName, error) {
dsw.Lock()
Expand Down Expand Up @@ -204,11 +225,11 @@ func (dsw *desiredStateOfWorld) AddPod(
}
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
}

if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] =
pod{
podName: podName,
podObj: podToAdd,
}
}

Expand Down Expand Up @@ -309,3 +330,22 @@ func (dsw *desiredStateOfWorld) GetVolumesToAttach() []VolumeToAttach {

return volumesToAttach
}

func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
dsw.RLock()
defer dsw.RUnlock()

pods := make(map[types.UniquePodName]PodToAdd)
for nodeName, nodeObj := range dsw.nodesManaged {
for volumeName, volumeObj := range nodeObj.volumesToAttach {
for podUID, pod := range volumeObj.scheduledPods {
pods[podUID] = PodToAdd{
Pod: pod.podObj,
VolumeName: volumeName,
NodeName: nodeName,
}
}
}
}
return pods
}