Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix data race reported by go test ./... -race #371

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions pkg/csi/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package csi
import (
"context"
"fmt"
"sync/atomic"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
Expand All @@ -20,8 +21,6 @@ func NewMockClient(
supportsNodeResize: supportsNodeResize,
supportsControllerResize: supportsControllerResize,
supportsControllerModify: supportsControllerModify,
expandCalled: 0,
modifyCalled: 0,
supportsPluginControllerService: supportsPluginControllerService,
supportsControllerSingleNodeMultiWriter: supportsControllerSingleNodeMultiWriter,
}
Expand All @@ -34,13 +33,13 @@ type MockClient struct {
supportsControllerModify bool
supportsPluginControllerService bool
supportsControllerSingleNodeMultiWriter bool
expandCalled int
modifyCalled int
expandCalled atomic.Int32
modifyCalled atomic.Int32
expansionFailed bool
modifyFailed bool
checkMigratedLabel bool
usedSecrets map[string]string
usedCapability *csi.VolumeCapability
usedSecrets atomic.Pointer[map[string]string]
usedCapability atomic.Pointer[csi.VolumeCapability]
}

func (c *MockClient) GetDriverName(context.Context) (string, error) {
Expand Down Expand Up @@ -87,7 +86,7 @@ func (c *MockClient) Expand(
capability *csi.VolumeCapability) (int64, bool, error) {
// TODO: Determine whether the operation succeeds or fails by parameters.
if c.expansionFailed {
c.expandCalled++
c.expandCalled.Add(1)
return requestBytes, c.supportsNodeResize, fmt.Errorf("expansion failed")
}
if c.checkMigratedLabel {
Expand All @@ -99,27 +98,27 @@ func (c *MockClient) Expand(
return requestBytes, c.supportsNodeResize, err
}
}
c.expandCalled++
c.usedSecrets = secrets
c.usedCapability = capability
c.expandCalled.Add(1)
c.usedSecrets.Store(&secrets)
c.usedCapability.Store(capability)
return requestBytes, c.supportsNodeResize, nil
}

func (c *MockClient) GetExpandCount() int {
return c.expandCalled
return int(c.expandCalled.Load())
}

func (c *MockClient) GetModifyCount() int {
return c.modifyCalled
return int(c.modifyCalled.Load())
}

func (c *MockClient) GetCapability() *csi.VolumeCapability {
return c.usedCapability
return c.usedCapability.Load()
}

// GetSecrets returns secrets used for volume expansion
func (c *MockClient) GetSecrets() map[string]string {
return c.usedSecrets
return *c.usedSecrets.Load()
}

func (c *MockClient) CloseConnection() {
Expand All @@ -131,7 +130,7 @@ func (c *MockClient) Modify(
volumeID string,
secrets map[string]string,
mutableParameters map[string]string) error {
c.modifyCalled++
c.modifyCalled.Add(1)
if c.modifyFailed {
return fmt.Errorf("modify failed")
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/modifycontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,29 +196,36 @@ func isFirstTimeModifyVolumeWithPVC(pvc *v1.PersistentVolumeClaim, pv *v1.Persis
return false
}

// Run starts the controller.
func (ctrl *modifyController) Run(
workers int, ctx context.Context) {
defer ctrl.claimQueue.ShutDown()

klog.InfoS("Starting external resizer for modify volume", "controller", ctrl.name)
defer klog.InfoS("Shutting down external resizer", "controller", ctrl.name)

stopCh := ctx.Done()
func (ctrl *modifyController) init(ctx context.Context) bool {
informersSyncd := []cache.InformerSynced{ctrl.pvListerSynced, ctrl.pvcListerSynced}
informersSyncd = append(informersSyncd, ctrl.vacListerSynced)

if !cache.WaitForCacheSync(stopCh, informersSyncd...) {
if !cache.WaitForCacheSync(ctx.Done(), informersSyncd...) {
klog.ErrorS(nil, "Cannot sync pod, pv, pvc or vac caches")
return
return false
}

// Cache all the InProgress/Infeasible PVCs as Uncertain for ModifyVolume
err := ctrl.initUncertainPVCs()
if err != nil {
klog.ErrorS(err, "Failed to initialize uncertain pvcs")
}
return true
}

// Run starts the controller.
func (ctrl *modifyController) Run(
workers int, ctx context.Context) {
defer ctrl.claimQueue.ShutDown()

klog.InfoS("Starting external resizer for modify volume", "controller", ctrl.name)
defer klog.InfoS("Shutting down external resizer", "controller", ctrl.name)

if !ctrl.init(ctx) {
return
}

stopCh := ctx.Done()
for i := 0; i < workers; i++ {
go wait.Until(ctrl.sync, 0, stopCh)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/modifycontroller/modify_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {

ctx := context.TODO()
defer ctx.Done()
go controller.Run(1, ctx)
success := ctrlInstance.init(ctx)
if !success {
t.Fatal("failed to init controller")
}

for _, obj := range initialObjects {
switch obj.(type) {
Expand Down