Skip to content

Commit

Permalink
Merge pull request #2164 from MartinForReal/shafan/snapshot
Browse files Browse the repository at this point in the history
Refactor: migrate snapshot client to track2 sdk
  • Loading branch information
k8s-ci-robot committed Jan 22, 2024
2 parents fff0161 + dab80dc commit 572b2cd
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 246 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4 v4.3.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
github.com/Azure/go-autorest/autorest v0.11.29
github.com/Azure/go-autorest/autorest/date v0.3.0
github.com/Azure/go-autorest/autorest/mocks v0.4.2
github.com/container-storage-interface/spec v1.9.0
github.com/golang/protobuf v1.5.3
Expand Down Expand Up @@ -56,6 +55,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.23 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
Expand Down
20 changes: 12 additions & 8 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,23 @@ func (d *DriverCore) getHostUtil() hostUtil {
}

// getSnapshotCompletionPercent returns the completion percent of snapshot
func (d *DriverCore) getSnapshotCompletionPercent(ctx context.Context, subsID, resourceGroup, snapshotName string) (float64, error) {
copySnapshot, rerr := d.cloud.SnapshotsClient.Get(ctx, subsID, resourceGroup, snapshotName)
if rerr != nil {
return 0.0, rerr.Error()
func (d *DriverCore) getSnapshotCompletionPercent(ctx context.Context, subsID, resourceGroup, snapshotName string) (float32, error) {
snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
if err != nil {
return 0.0, err
}
copySnapshot, err := snapshotClient.Get(ctx, resourceGroup, snapshotName)
if err != nil {
return 0.0, err
}

if copySnapshot.SnapshotProperties == nil || copySnapshot.SnapshotProperties.CompletionPercent == nil {
if copySnapshot.Properties == nil || copySnapshot.Properties.CompletionPercent == nil {
// If CompletionPercent is nil, it means the snapshot is complete
klog.V(2).Infof("snapshot(%s) under rg(%s) has no SnapshotProperties or CompletionPercent is nil", snapshotName, resourceGroup)
return 100.0, nil
}

return *copySnapshot.SnapshotProperties.CompletionPercent, nil
return *copySnapshot.Properties.CompletionPercent, nil
}

// waitForSnapshotReady wait for completionPercent of snapshot is 100.0
Expand All @@ -472,7 +476,7 @@ func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceG
return err
}

if completionPercent >= float64(100.0) {
if completionPercent >= float32(100.0) {
klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
return nil
}
Expand All @@ -487,7 +491,7 @@ func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceG
return err
}

if completionPercent >= float64(100.0) {
if completionPercent >= float32(100.0) {
klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
return nil
}
Expand Down
53 changes: 25 additions & 28 deletions pkg/azuredisk/azuredisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
"github.com/Azure/go-autorest/autorest/date"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"
Expand All @@ -38,10 +37,9 @@ import (
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/diskclient/mock_diskclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/mock_azclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/snapshotclient/mocksnapshotclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/snapshotclient/mock_snapshotclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
)

func TestNewDriverV1(t *testing.T) {
Expand Down Expand Up @@ -323,17 +321,15 @@ func TestWaitForSnapshot(t *testing.T) {
intervel := 1 * time.Millisecond
timeout := 10 * time.Millisecond
snapshotID := "test"
snapshot := compute.Snapshot{
SnapshotProperties: &compute.SnapshotProperties{},
ID: &snapshotID}
snapshot := &armcompute.Snapshot{
Properties: &armcompute.SnapshotProperties{},
ID: &snapshotID}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockSnapshotClient := mocksnapshotclient.NewMockInterface(ctrl)
d.getCloud().SnapshotsClient = mockSnapshotClient
rerr := &retry.Error{
RawError: fmt.Errorf("invalid snapshotID"),
}
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, rerr).AnyTimes()
mockSnapshotClient := mock_snapshotclient.NewMockInterface(ctrl)
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetSnapshotClientForSub(subID).Return(mockSnapshotClient, nil).AnyTimes()

mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, fmt.Errorf("invalid snapshotID")).AnyTimes()
err := d.waitForSnapshotReady(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)

wantErr := true
Expand Down Expand Up @@ -362,21 +358,22 @@ func TestWaitForSnapshot(t *testing.T) {
snapshotID := "test"
location := "loc"
provisioningState := "succeeded"
snapshot := compute.Snapshot{
SnapshotProperties: &compute.SnapshotProperties{
TimeCreated: &date.Time{},
snapshot := &armcompute.Snapshot{
Properties: &armcompute.SnapshotProperties{
TimeCreated: &time.Time{},
ProvisioningState: &provisioningState,
DiskSizeGB: &DiskSize,
CreationData: &compute.CreationData{SourceResourceID: &volumeID},
CompletionPercent: pointer.Float64(0.0),
CreationData: &armcompute.CreationData{SourceResourceID: &volumeID},
CompletionPercent: pointer.Float32(0.0),
},
Location: &location,
ID: &snapshotID}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockSnapshotClient := mocksnapshotclient.NewMockInterface(ctrl)
d.getCloud().SnapshotsClient = mockSnapshotClient
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, nil).AnyTimes()
mockSnapshotClient := mock_snapshotclient.NewMockInterface(ctrl)
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetSnapshotClientForSub(subID).Return(mockSnapshotClient, nil).AnyTimes()

mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, nil).AnyTimes()
err := d.waitForSnapshotReady(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)

wantErr := true
Expand Down Expand Up @@ -405,21 +402,21 @@ func TestWaitForSnapshot(t *testing.T) {
snapshotID := "test"
location := "loc"
provisioningState := "succeeded"
snapshot := compute.Snapshot{
SnapshotProperties: &compute.SnapshotProperties{
TimeCreated: &date.Time{},
snapshot := &armcompute.Snapshot{
Properties: &armcompute.SnapshotProperties{
TimeCreated: &time.Time{},
ProvisioningState: &provisioningState,
DiskSizeGB: &DiskSize,
CreationData: &compute.CreationData{SourceResourceID: &volumeID},
CompletionPercent: pointer.Float64(100.0),
CreationData: &armcompute.CreationData{SourceResourceID: &volumeID},
CompletionPercent: pointer.Float32(100.0),
},
Location: &location,
ID: &snapshotID}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockSnapshotClient := mocksnapshotclient.NewMockInterface(ctrl)
d.getCloud().SnapshotsClient = mockSnapshotClient
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, nil).AnyTimes()
mockSnapshotClient := mock_snapshotclient.NewMockInterface(ctrl)
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetSnapshotClientForSub(subID).Return(mockSnapshotClient, nil).AnyTimes()
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, nil).AnyTimes()
err := d.waitForSnapshotReady(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)

wantErr := false
Expand Down
81 changes: 46 additions & 35 deletions pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
"github.com/container-storage-interface/spec/lib/go/csi"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -957,10 +957,10 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
tags[k] = &value
}

snapshot := compute.Snapshot{
SnapshotProperties: &compute.SnapshotProperties{
CreationData: &compute.CreationData{
CreateOption: compute.Copy,
snapshot := armcompute.Snapshot{
Properties: &armcompute.SnapshotProperties{
CreationData: &armcompute.CreationData{
CreateOption: to.Ptr(armcompute.DiskCreateOptionCopy),
SourceResourceID: &sourceVolumeID,
},
Incremental: &incremental,
Expand All @@ -973,7 +973,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
if err := azureutils.ValidateDataAccessAuthMode(dataAccessAuthMode); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
snapshot.SnapshotProperties.DataAccessAuthMode = compute.DataAccessAuthMode(dataAccessAuthMode)
snapshot.Properties.DataAccessAuthMode = to.Ptr(armcompute.DataAccessAuthMode(dataAccessAuthMode))
}

if acquired := d.volumeLocks.TryAcquire(snapshotName); !acquired {
Expand Down Expand Up @@ -1002,13 +1002,17 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
}()

klog.V(2).Infof("begin to create snapshot(%s, incremental: %v) under rg(%s) region(%s)", snapshotName, incremental, resourceGroup, d.cloud.Location)
if rerr := d.cloud.SnapshotsClient.CreateOrUpdate(ctx, subsID, resourceGroup, snapshotName, snapshot); rerr != nil {
if strings.Contains(rerr.Error().Error(), "existing disk") {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("request snapshot(%s) under rg(%s) already exists, but the SourceVolumeId is different, error details: %v", snapshotName, resourceGroup, rerr.Error()))
snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not get snapshot client for subscription(%s) with error(%v)", subsID, err)
}
if _, err := snapshotClient.CreateOrUpdate(ctx, resourceGroup, snapshotName, snapshot); err != nil {
if strings.Contains(err.Error(), "existing disk") {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("request snapshot(%s) under rg(%s) already exists, but the SourceVolumeId is different, error details: %v", snapshotName, resourceGroup, err))
}

azureutils.SleepIfThrottled(rerr.Error(), consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("create snapshot error: %v", rerr.Error()))
azureutils.SleepIfThrottled(err, consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("create snapshot error: %v", err.Error()))
}

if d.shouldWaitForSnapshotReady {
Expand All @@ -1025,24 +1029,24 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ

if crossRegionSnapshotName != "" {
copySnapshot := snapshot
if copySnapshot.SnapshotProperties == nil {
copySnapshot.SnapshotProperties = &compute.SnapshotProperties{}
if copySnapshot.Properties == nil {
copySnapshot.Properties = &armcompute.SnapshotProperties{}
}
if copySnapshot.SnapshotProperties.CreationData == nil {
copySnapshot.SnapshotProperties.CreationData = &compute.CreationData{}
if copySnapshot.Properties.CreationData == nil {
copySnapshot.Properties.CreationData = &armcompute.CreationData{}
}
copySnapshot.SnapshotProperties.CreationData.SourceResourceID = &csiSnapshot.SnapshotId
copySnapshot.SnapshotProperties.CreationData.CreateOption = compute.CopyStart
copySnapshot.Properties.CreationData.SourceResourceID = &csiSnapshot.SnapshotId
copySnapshot.Properties.CreationData.CreateOption = to.Ptr(armcompute.DiskCreateOptionCopyStart)
copySnapshot.Location = &location

klog.V(2).Infof("begin to create snapshot(%s, incremental: %v) under rg(%s) region(%s)", crossRegionSnapshotName, incremental, resourceGroup, location)
if rerr := d.cloud.SnapshotsClient.CreateOrUpdate(ctx, subsID, resourceGroup, crossRegionSnapshotName, copySnapshot); rerr != nil {
if strings.Contains(rerr.Error().Error(), "existing disk") {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("request snapshot(%s) under rg(%s) already exists, but the SourceVolumeId is different, error details: %v", crossRegionSnapshotName, resourceGroup, rerr.Error()))
if _, err := snapshotClient.CreateOrUpdate(ctx, resourceGroup, crossRegionSnapshotName, copySnapshot); err != nil {
if strings.Contains(err.Error(), "existing disk") {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("request snapshot(%s) under rg(%s) already exists, but the SourceVolumeId is different, error details: %v", crossRegionSnapshotName, resourceGroup, err))
}

azureutils.SleepIfThrottled(rerr.Error(), consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("create snapshot error: %v", rerr.Error()))
azureutils.SleepIfThrottled(err, consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("create snapshot error: %v", err))
}
klog.V(2).Infof("create snapshot(%s) under rg(%s) region(%s) successfully", crossRegionSnapshotName, resourceGroup, location)

Expand All @@ -1051,9 +1055,9 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
}

klog.V(2).Infof("begin to delete snapshot(%s) under rg(%s) region(%s)", snapshotName, resourceGroup, d.cloud.Location)
if rerr := d.cloud.SnapshotsClient.Delete(ctx, subsID, resourceGroup, snapshotName); rerr != nil {
klog.Errorf("delete snapshot error: %v", rerr.Error())
azureutils.SleepIfThrottled(rerr.Error(), consts.SnapshotOpThrottlingSleepSec)
if err = snapshotClient.Delete(ctx, resourceGroup, snapshotName); err != nil {
klog.Errorf("delete snapshot error: %v", err)
azureutils.SleepIfThrottled(err, consts.SnapshotOpThrottlingSleepSec)
} else {
klog.V(2).Infof("delete snapshot(%s) under rg(%s) region(%s) successfully", snapshotName, resourceGroup, d.cloud.Location)
}
Expand Down Expand Up @@ -1097,9 +1101,13 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ
}()

klog.V(2).Infof("begin to delete snapshot(%s) under rg(%s)", snapshotName, resourceGroup)
if rerr := d.cloud.SnapshotsClient.Delete(ctx, subsID, resourceGroup, snapshotName); rerr != nil {
azureutils.SleepIfThrottled(rerr.Error(), consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("delete snapshot error: %v", rerr.Error()))
snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not get snapshot client for subscription(%s) with error(%v)", subsID, err)
}
if err := snapshotClient.Delete(ctx, resourceGroup, snapshotName); err != nil {
azureutils.SleepIfThrottled(err, consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("delete snapshot error: %v", err))
}
klog.V(2).Infof("delete snapshot(%s) under rg(%s) successfully", snapshotName, resourceGroup)
isOperationSucceeded = true
Expand Down Expand Up @@ -1127,9 +1135,9 @@ func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReques
}
return listSnapshotResp, nil
}

snapshotClient := d.clientFactory.GetSnapshotClient()
// no SnapshotId is set, return all snapshots that satisfy the request.
snapshots, err := d.cloud.SnapshotsClient.ListByResourceGroup(ctx, "", d.cloud.ResourceGroup)
snapshots, err := snapshotClient.List(ctx, d.cloud.ResourceGroup)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown list snapshot error: %v", err.Error()))
}
Expand All @@ -1146,13 +1154,16 @@ func (d *Driver) getSnapshotByID(ctx context.Context, subsID, resourceGroup, sna
return nil, status.Errorf(codes.Internal, err.Error())
}
}

snapshot, rerr := d.cloud.SnapshotsClient.Get(ctx, subsID, resourceGroup, snapshotName)
if rerr != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("get snapshot %s from rg(%s) error: %v", snapshotName, resourceGroup, rerr.Error()))
snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not get snapshot client for subscription(%s) with error(%v)", subsID, err)
}
snapshot, err := snapshotClient.Get(ctx, resourceGroup, snapshotName)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("get snapshot %s from rg(%s) error: %v", snapshotName, resourceGroup, err))
}

return azureutils.GenerateCSISnapshot(sourceVolumeID, &snapshot)
return azureutils.GenerateCSISnapshot(sourceVolumeID, snapshot)
}

// GetSourceDiskSize recursively searches for the sourceDisk and returns: sourceDisk disk size, error
Expand Down
Loading

0 comments on commit 572b2cd

Please sign in to comment.