Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #79897: add ability for gce to bulk verify attached disks #80180: move getInstancesByName logic to helper function #80446: add unit tests for attacher DisksAreAttached and #81034

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions pkg/cloudprovider/providers/gce/gce_disks.go
Expand Up @@ -441,6 +441,10 @@ type Disks interface {
// to the node with the specified NodeName.
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)

// BulkDisksAreAttached is a batch function to check if all corresponding disks are attached to the
// nodes specified with nodeName.
BulkDisksAreAttached(diskByNodes map[types.NodeName][]string) (map[types.NodeName]map[string]bool, error)

// CreateDisk creates a new PD with given properties. Tags are serialized
// as JSON into Description field.
CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error
Expand Down Expand Up @@ -637,6 +641,37 @@ func (g *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (m
return attached, nil
}

// BulkDisksAreAttached is a batch function to check if all corresponding disks are attached to the
// nodes specified with nodeName.
func (g *Cloud) BulkDisksAreAttached(diskByNodes map[types.NodeName][]string) (map[types.NodeName]map[string]bool, error) {
instanceNames := []string{}
for nodeName := range diskByNodes {
instanceNames = append(instanceNames, mapNodeNameToInstanceName(nodeName))
}

// List all instances with the given instance names
// Then for each instance listed, add the disks attached to that instance to a map
listedInstances, err := g.getFoundInstanceByNames(instanceNames)
if err != nil {
return nil, fmt.Errorf("error listing instances: %v", err)
}
listedInstanceNamesToDisks := make(map[string][]*compute.AttachedDisk)
for _, instance := range listedInstances {
listedInstanceNamesToDisks[instance.Name] = instance.Disks
}

verifyDisksAttached := make(map[types.NodeName]map[string]bool)

// For each node and its desired attached disks that needs to be verified
for nodeName, disksToVerify := range diskByNodes {
instanceName := canonicalizeInstanceName(mapNodeNameToInstanceName(nodeName))
disksActuallyAttached := listedInstanceNamesToDisks[instanceName]
verifyDisksAttached[nodeName] = verifyDisksAttachedToNode(disksToVerify, disksActuallyAttached)
}

return verifyDisksAttached, nil
}

// CreateDisk creates a new Persistent Disk, with the specified name &
// size, in the specified zone. It stores specified tags encoded in
// JSON in Description field.
Expand Down Expand Up @@ -1024,3 +1059,20 @@ func isGCEError(err error, reason string) bool {
}
return false
}

// verifyDisksAttachedToNode takes in an slice of disks that should be attached to an instance, and the
// slice of disks actually attached to it. It returns a map verifying if the disks are actually attached.
func verifyDisksAttachedToNode(disksToVerify []string, disksActuallyAttached []*compute.AttachedDisk) map[string]bool {
verifiedDisks := make(map[string]bool)
diskNamesActuallyAttached := sets.NewString()
for _, disk := range disksActuallyAttached {
diskNamesActuallyAttached.Insert(disk.DeviceName)
}

// For every disk that's supposed to be attached, verify that it is
for _, diskName := range disksToVerify {
verifiedDisks[diskName] = diskNamesActuallyAttached.Has(diskName)
}

return verifiedDisks
}
34 changes: 22 additions & 12 deletions pkg/cloudprovider/providers/gce/gce_instances.go
Expand Up @@ -426,6 +426,19 @@ func (g *Cloud) AddAliasToInstance(nodeName types.NodeName, alias *net.IPNet) er
// Gets the named instances, returning cloudprovider.InstanceNotFound if any
// instance is not found
func (g *Cloud) getInstancesByNames(names []string) ([]*gceInstance, error) {
foundInstances, err := g.getFoundInstanceByNames(names)
if err != nil {
return nil, err
}
if len(foundInstances) != len(names) {
return nil, cloudprovider.InstanceNotFound
}
return foundInstances, nil
}

// Gets the named instances, returning a list of gceInstances it was able to find from the provided
// list of names.
func (g *Cloud) getFoundInstanceByNames(names []string) ([]*gceInstance, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()

Expand Down Expand Up @@ -472,20 +485,17 @@ func (g *Cloud) getInstancesByNames(names []string) ([]*gceInstance, error) {
}
}

if remaining > 0 {
var failed []string
for k := range found {
if found[k] == nil {
failed = append(failed, k)
}
var ret []*gceInstance
var failed []string
for name, instance := range found {
if instance != nil {
ret = append(ret, instance)
} else {
failed = append(failed, name)
}
klog.Errorf("Failed to retrieve instances: %v", failed)
return nil, cloudprovider.InstanceNotFound
}

var ret []*gceInstance
for _, instance := range found {
ret = append(ret, instance)
if len(failed) > 0 {
klog.Errorf("Failed to retrieve instances: %v", failed)
}

return ret, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/volume/gcepd/BUILD
Expand Up @@ -60,6 +60,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
Expand Down
35 changes: 35 additions & 0 deletions pkg/volume/gcepd/attacher.go
Expand Up @@ -141,6 +141,41 @@ func (attacher *gcePersistentDiskAttacher) VolumesAreAttached(specs []*volume.Sp
return volumesAttachedCheck, nil
}

func (attacher *gcePersistentDiskAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
diskNamesByNode := make(map[types.NodeName][]string)
volumeSpecToDiskName := make(map[*volume.Spec]string)

for nodeName, volumeSpecs := range volumesByNode {
diskNames := []string{}
for _, spec := range volumeSpecs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
klog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
diskNames = append(diskNames, volumeSource.PDName)
volumeSpecToDiskName[spec] = volumeSource.PDName
}
diskNamesByNode[nodeName] = diskNames
}

attachedDisksByNode, err := attacher.gceDisks.BulkDisksAreAttached(diskNamesByNode)
if err != nil {
return nil, err
}

for nodeName, volumeSpecs := range volumesByNode {
volumesAreAttachedToNode := make(map[*volume.Spec]bool)
for _, spec := range volumeSpecs {
diskName := volumeSpecToDiskName[spec]
volumesAreAttachedToNode[spec] = attachedDisksByNode[nodeName][diskName]
}
volumesAttachedCheck[nodeName] = volumesAreAttachedToNode
}
return volumesAttachedCheck, nil
}

// search Windows disk number by LUN
func getDiskID(pdName string, exec mount.Exec) (string, error) {
// TODO: replace Get-GcePdName with native windows support of Get-Disk, see issue #74674
Expand Down