Skip to content

Commit

Permalink
AWS: move volume attachment map to cloud level
Browse files Browse the repository at this point in the history
The problem is that attachments are now done on the master, and we are
only caching the attachment map persistently for the local instance.  So
there is now a race, because the attachment map is cleared every time.

Issue kubernetes#29324
  • Loading branch information
justinsb authored and saad-ali committed Sep 21, 2016
1 parent c72d71e commit e070bbb
Showing 1 changed file with 49 additions and 34 deletions.
83 changes: 49 additions & 34 deletions pkg/cloudprovider/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ type Cloud struct {
mutex sync.Mutex
lastNodeNames sets.String
lastInstancesByNodeNames []*ec2.Instance

// We keep an active list of devices we have assigned but not yet
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attachingMutex sync.Mutex
attaching map[ /*nodeName*/ string]map[mountDevice]string
}

var _ Volumes = &Cloud{}
Expand Down Expand Up @@ -645,6 +651,10 @@ func azToRegion(az string) (string, error) {
// newAWSCloud creates a new instance of AWSCloud.
// AWSProvider and instanceId are primarily for tests
func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
// We have some state in the Cloud object - in particular the attaching map
// Log so that if we are building multiple Cloud objects, it is obvious!
glog.Infof("Building AWS cloudprovider")

metadata, err := awsServices.Metadata()
if err != nil {
return nil, fmt.Errorf("error creating AWS metadata client: %v", err)
Expand Down Expand Up @@ -691,6 +701,8 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
metadata: metadata,
cfg: cfg,
region: regionName,

attaching: make(map[string]map[mountDevice]string),
}

selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
Expand Down Expand Up @@ -1010,13 +1022,6 @@ type awsInstance struct {

// instance type
instanceType string

mutex sync.Mutex

// We keep an active list of devices we have assigned but not yet
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attaching map[mountDevice]string
}

// newAWSInstance creates a new awsInstance object
Expand All @@ -1035,8 +1040,6 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance {
subnetID: aws.StringValue(instance.SubnetId),
}

self.attaching = make(map[mountDevice]string)

return self
}

Expand Down Expand Up @@ -1070,18 +1073,12 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
instanceType := i.getInstanceType()
if instanceType == nil {
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
}

// We lock to prevent concurrent mounts from conflicting
// We may still conflict if someone calls the API concurrently,
// but the AWS API will then fail one of the two attach operations
i.mutex.Lock()
defer i.mutex.Unlock()

info, err := i.describeInstance()
if err != nil {
return "", false, err
Expand All @@ -1101,7 +1098,13 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
}

for mountDevice, volume := range i.attaching {
// We lock to prevent concurrent mounts from conflicting
// We may still conflict if someone calls the API concurrently,
// but the AWS API will then fail one of the two attach operations
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()

for mountDevice, volume := range c.attaching[i.nodeName] {
deviceMappings[mountDevice] = volume
}

Expand Down Expand Up @@ -1136,27 +1139,34 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou
return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName)
}

i.attaching[chosen] = volumeID
attaching := c.attaching[i.nodeName]
if attaching == nil {
attaching = make(map[mountDevice]string)
c.attaching[i.nodeName] = attaching
}
attaching[chosen] = volumeID
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)

return chosen, false, nil
}

func (i *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) {
i.mutex.Lock()
defer i.mutex.Unlock()
// endAttaching removes the entry from the "attachments in progress" map
// It returns true if it was found (and removed), false otherwise
func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool {
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()

existingVolumeID, found := i.attaching[mountDevice]
existingVolumeID, found := c.attaching[i.nodeName][mountDevice]
if !found {
glog.Errorf("endAttaching on non-allocated device")
return
return false
}
if volumeID != existingVolumeID {
glog.Errorf("endAttaching on device assigned to different volume")
return
return false
}
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID)
delete(i.attaching, mountDevice)
glog.V(2).Infof("Releasing in-process attachment entry: %s -> volume %s", mountDevice, volumeID)
delete(c.attaching[i.nodeName], mountDevice)
return true
}

type awsDisk struct {
Expand Down Expand Up @@ -1348,7 +1358,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
return "", errors.New("AWS volumes cannot be mounted read-only")
}

mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true)
mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, true)
if err != nil {
return "", err
}
Expand All @@ -1360,11 +1370,13 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
ec2Device := "/dev/xvd" + string(mountDevice)

// attachEnded is set to true if the attach operation completed
// (successfully or not)
// (successfully or not), and is thus no longer in progress
attachEnded := false
defer func() {
if attachEnded {
awsInstance.endAttaching(disk.awsID, mountDevice)
if !c.endAttaching(awsInstance, disk.awsID, mountDevice) {
glog.Errorf("endAttaching called when attach not in progress")
}
}
}()

Expand All @@ -1390,6 +1402,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
return "", err
}

// The attach operation has finished
attachEnded = true

return hostDevice, nil
Expand All @@ -1416,14 +1429,14 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
return "", err
}

mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false)
mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, false)
if err != nil {
return "", err
}

if !alreadyAttached {
glog.Warning("DetachDisk called on non-attached disk: ", diskName)
// TODO: Continue? Tolerate non-attached error in DetachVolume?
glog.Warningf("DetachDisk called on non-attached disk: %s", diskName)
// TODO: Continue? Tolerate non-attached error from the AWS DetachVolume call?
}

request := ec2.DetachVolumeInput{
Expand All @@ -1445,7 +1458,9 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
}

if mountDevice != "" {
awsInstance.endAttaching(disk.awsID, mountDevice)
c.endAttaching(awsInstance, disk.awsID, mountDevice)
// We don't check the return value - we don't really expect the attachment to have been
// in progress, though it might have been
}

hostDevicePath := "/dev/xvd" + string(mountDevice)
Expand Down

0 comments on commit e070bbb

Please sign in to comment.