diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index 6b52a6c52..57b4a3e4f 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -19,6 +19,7 @@ package blob import ( "fmt" "strings" + "sync" "time" "golang.org/x/net/context" @@ -139,6 +140,8 @@ type Driver struct { volumeLocks *volumeLocks // only for nfs feature subnetLockMap *util.LockMap + // a map storing all volumes created by this driver + volMap sync.Map // a timed cache storing acount search history (solve account list throttling issue) accountSearchCache *azcache.TimedCache } diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 7e31b9e4d..bcd453c8d 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -46,18 +46,18 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } volumeCapabilities := req.GetVolumeCapabilities() - name := req.GetName() - if len(name) == 0 { + volName := req.GetName() + if len(volName) == 0 { return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided") } if len(volumeCapabilities) == 0 { return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided") } - if acquired := d.volumeLocks.TryAcquire(name); !acquired { - return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, name) + if acquired := d.volumeLocks.TryAcquire(volName); !acquired { + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) } - defer d.volumeLocks.Release(name) + defer d.volumeLocks.Release(volName) volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes()) requestGiB := int(util.RoundUpGiB(volSizeBytes)) @@ -190,30 +190,35 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) var accountKey string accountName := account if len(req.GetSecrets()) == 0 && accountName == "" { - lockKey := fmt.Sprintf("%s%s%s%s%s", storageAccountType, accountKind, resourceGroup, location, protocol) - // search in cache first - cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault) - if err != nil { - return nil, err - } - if cache != nil { - accountName = cache.(string) + if v, ok := d.volMap.Load(volName); ok { + accountName = v.(string) } else { - d.volLockMap.LockEntry(lockKey) - err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) { - var retErr error - accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol) - if isRetriableError(retErr) { - klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr) - return false, nil - } - return true, retErr - }) - d.volLockMap.UnlockEntry(lockKey) + lockKey := fmt.Sprintf("%s%s%s%s%s", storageAccountType, accountKind, resourceGroup, location, protocol) + // search in cache first + cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault) if err != nil { - return nil, status.Errorf(codes.Internal, "failed to ensure storage account: %v", err) + return nil, err + } + if cache != nil { + accountName = cache.(string) + } else { + d.volLockMap.LockEntry(lockKey) + err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) { + var retErr error + accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol) + if isRetriableError(retErr) { + klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr) + return false, nil + } + return true, retErr + }) + d.volLockMap.UnlockEntry(lockKey) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to ensure storage account: %v", err) + } + d.accountSearchCache.Set(lockKey, accountName) + d.volMap.Store(volName, accountName) } - d.accountSearchCache.Set(lockKey, accountName) } } accountOptions.Name = accountName @@ -226,7 +231,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) validContainerName := containerName if validContainerName == "" { - validContainerName = getValidContainerName(name, protocol) + validContainerName = getValidContainerName(volName, protocol) parameters[containerNameField] = validContainerName } @@ -262,7 +267,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) if containerName != "" { // add volume name as suffix to differentiate volumeID since "containerName" is specified // not necessary for dynamic container name creation since volumeID already contains volume name - volumeID = volumeID + "#" + name + volumeID = volumeID + "#" + volName } klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)