Skip to content

Commit

Permalink
feat: add volume map to prevent race condition in volume creation
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhangx committed Oct 9, 2021
1 parent a4a99ec commit 8275f94
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
3 changes: 3 additions & 0 deletions pkg/blob/blob.go
Expand Up @@ -19,6 +19,7 @@ package blob
import (
"fmt"
"strings"
"sync"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -139,6 +140,8 @@ type Driver struct {
volumeLocks *volumeLocks
// only for nfs feature
subnetLockMap *util.LockMap
// a map storing all volumes created by this driver <volumeName, accountName>
volMap sync.Map
// a timed cache storing acount search history (solve account list throttling issue)
accountSearchCache *azcache.TimedCache
}
Expand Down
61 changes: 33 additions & 28 deletions pkg/blob/controllerserver.go
Expand Up @@ -46,18 +46,18 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

volumeCapabilities := req.GetVolumeCapabilities()
name := req.GetName()
if len(name) == 0 {
volName := req.GetName()
if len(volName) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
}
if len(volumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided")
}

if acquired := d.volumeLocks.TryAcquire(name); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, name)
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
}
defer d.volumeLocks.Release(name)
defer d.volumeLocks.Release(volName)

volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
requestGiB := int(util.RoundUpGiB(volSizeBytes))
Expand Down Expand Up @@ -190,30 +190,35 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
var accountKey string
accountName := account
if len(req.GetSecrets()) == 0 && accountName == "" {
lockKey := fmt.Sprintf("%s%s%s%s%s", storageAccountType, accountKind, resourceGroup, location, protocol)
// search in cache first
cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
if err != nil {
return nil, err
}
if cache != nil {
accountName = cache.(string)
if v, ok := d.volMap.Load(volName); ok {
accountName = v.(string)
} else {
d.volLockMap.LockEntry(lockKey)
err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
var retErr error
accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
if isRetriableError(retErr) {
klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
return false, nil
}
return true, retErr
})
d.volLockMap.UnlockEntry(lockKey)
lockKey := fmt.Sprintf("%s%s%s%s%s", storageAccountType, accountKind, resourceGroup, location, protocol)
// search in cache first
cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to ensure storage account: %v", err)
return nil, err
}
if cache != nil {
accountName = cache.(string)
} else {
d.volLockMap.LockEntry(lockKey)
err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
var retErr error
accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
if isRetriableError(retErr) {
klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
return false, nil
}
return true, retErr
})
d.volLockMap.UnlockEntry(lockKey)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to ensure storage account: %v", err)
}
d.accountSearchCache.Set(lockKey, accountName)
d.volMap.Store(volName, accountName)
}
d.accountSearchCache.Set(lockKey, accountName)
}
}
accountOptions.Name = accountName
Expand All @@ -226,7 +231,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)

validContainerName := containerName
if validContainerName == "" {
validContainerName = getValidContainerName(name, protocol)
validContainerName = getValidContainerName(volName, protocol)
parameters[containerNameField] = validContainerName
}

Expand Down Expand Up @@ -262,7 +267,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
if containerName != "" {
// add volume name as suffix to differentiate volumeID since "containerName" is specified
// not necessary for dynamic container name creation since volumeID already contains volume name
volumeID = volumeID + "#" + name
volumeID = volumeID + "#" + volName
}
klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)

Expand Down

0 comments on commit 8275f94

Please sign in to comment.