Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add success event for static volume provisioning #2765

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/syncer/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func fullSyncCreateVolumes(ctx context.Context, createSpecArray []cnstypes.CnsVo
volManager volumes.Manager, vc string) {
log := logger.GetLogger(ctx)
defer wg.Done()
currentK8sPVMap := make(map[string]bool)
currentK8sPVMap := make(map[string]*v1.PersistentVolume)
volumeOperationsLock[vc].Lock()
defer volumeOperationsLock[vc].Unlock()
// Get all K8s PVs in the given VC.
Expand All @@ -493,7 +493,7 @@ func fullSyncCreateVolumes(ctx context.Context, createSpecArray []cnstypes.CnsVo
// Create map for easy lookup.
for _, pv := range currentK8sPV {
if pv.Spec.CSI != nil {
currentK8sPVMap[pv.Spec.CSI.VolumeHandle] = true
currentK8sPVMap[pv.Spec.CSI.VolumeHandle] = pv
} else if migrationFeatureStateForFullSync && pv.Spec.VsphereVolume != nil {
migrationVolumeSpec := &migration.VolumeSpec{
VolumePath: pv.Spec.VsphereVolume.VolumePath,
Expand All @@ -505,7 +505,7 @@ func fullSyncCreateVolumes(ctx context.Context, createSpecArray []cnstypes.CnsVo
vc, migrationVolumeSpec, err)
return
}
currentK8sPVMap[volumeHandle] = true
currentK8sPVMap[volumeHandle] = pv
}
}
for _, createSpec := range createSpecArray {
Expand All @@ -523,7 +523,7 @@ func fullSyncCreateVolumes(ctx context.Context, createSpecArray []cnstypes.CnsVo
spew.Sdump(createSpec))
continue
}
if _, existsInK8s := currentK8sPVMap[volumeID]; existsInK8s {
if pv, existsInK8s := currentK8sPVMap[volumeID]; existsInK8s {
log.Debugf("FullSync for VC %s: Calling CreateVolume for volume id: %q with createSpec %+v",
vc, volumeID, spew.Sdump(createSpec))
_, _, err := volManager.CreateVolume(ctx, &createSpec, nil)
Expand All @@ -533,6 +533,11 @@ func fullSyncCreateVolumes(ctx context.Context, createSpecArray []cnstypes.CnsVo
continue
}

if !isDynamicallyCreatedVolume(ctx, pv) {
generateEventOnPv(ctx, pv, v1.EventTypeNormal,
staticVolumeProvisioningSuccessReason, staticVolumeProvisioningSuccessMessage)
}

if isMultiVCenterFssEnabled && len(metadataSyncer.configInfo.Cfg.VirtualCenter) > 1 {
// Create CNSVolumeInfo CR for the volume ID.
err = volumeInfoService.CreateVolumeInfo(ctx, volumeID, vc)
Expand Down
19 changes: 13 additions & 6 deletions pkg/syncer/metadatasyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2285,7 +2285,8 @@ func csiPVUpdated(ctx context.Context, newPv *v1.PersistentVolume, oldPv *v1.Per
if err != nil {
log.Errorf("PVUpdated: Failed to get VC host and volume manager for multi VC setup. "+
"Error occurred: %+v", err)
generateEventOnPv(ctx, oldPv, staticVolumeProvisioningFailure, "Failed to identify VC for volume.")
generateEventOnPv(ctx, oldPv, v1.EventTypeWarning,
staticVolumeProvisioningFailure, "Failed to identify VC for volume.")
return
}
} else {
Expand All @@ -2312,7 +2313,8 @@ func csiPVUpdated(ctx context.Context, newPv *v1.PersistentVolume, oldPv *v1.Per
} else {
// Failed to create static PV
log.Errorf("PVUpdated: Failed to create static file volume %q. Error: %+v", newPv.Name, err)
generateEventOnPv(ctx, oldPv, staticVolumeProvisioningFailure, "Failed to create volume on any of the VCs")
generateEventOnPv(ctx, oldPv, v1.EventTypeWarning,
staticVolumeProvisioningFailure, "Failed to create volume on any of the VCs")
return
}
return
Expand All @@ -2326,7 +2328,8 @@ func csiPVUpdated(ctx context.Context, newPv *v1.PersistentVolume, oldPv *v1.Per
if err != nil {
log.Errorf("PVUpdated: Failed to get VC host and volume manager for single VC setup. "+
"Error occoured: %+v", err)
generateEventOnPv(ctx, oldPv, staticVolumeProvisioningFailure, "Failed to identify VC for volume")
generateEventOnPv(ctx, oldPv, v1.EventTypeWarning,
staticVolumeProvisioningFailure, "Failed to identify VC for volume")
return
}
}
Expand Down Expand Up @@ -2355,9 +2358,13 @@ func csiPVUpdated(ctx context.Context, newPv *v1.PersistentVolume, oldPv *v1.Per
log.Infof("PVUpdated: Verified volume: %q is not marked as container volume in CNS. "+
"Calling CreateVolume with BackingID to mark volume as Container Volume.", oldPv.Spec.CSI.VolumeHandle)
// Call CreateVolume for Static Volume Provisioning.
_ = createCnsVolume(ctx, oldPv, metadataSyncer, cnsVolumeMgr, volumeType, vcHost, metadataList, volumeHandle)
errMsg := fmt.Sprintf("Failed to create volume on VC %s", vcHost)
generateEventOnPv(ctx, oldPv, staticVolumeProvisioningFailure, errMsg)
err = createCnsVolume(ctx, oldPv, metadataSyncer, cnsVolumeMgr, volumeType, vcHost, metadataList, volumeHandle)
if err != nil {
errMsg := fmt.Sprintf("Failed to create volume on VC %s", vcHost)
log.Errorf(errMsg)
generateEventOnPv(ctx, oldPv, v1.EventTypeWarning,
staticVolumeProvisioningFailure, errMsg)
}
return
} else if queryResult.Volumes[0].VolumeId.Id == oldPv.Spec.CSI.VolumeHandle {
log.Infof("PVUpdated: Verified volume: %q is already marked as container volume in CNS.",
Expand Down
25 changes: 22 additions & 3 deletions pkg/syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const (
syncerComponent = "VSphere CSI Syncer"
// reason for PV creation failure when static volume provioining fails
staticVolumeProvisioningFailure = "static volume provisioning failed"
// reason for successful PV creation for static volumes
staticVolumeProvisioningSuccessReason = "static volume provisioning succeeded"
// message for successful PV creation for static volumes
staticVolumeProvisioningSuccessMessage = "Successfully created container volume"
)

// getPVsInBoundAvailableOrReleased return PVs in Bound, Available or Released
Expand Down Expand Up @@ -710,7 +714,8 @@ func createVolumeOnMultiVc(ctx context.Context, pv *v1.PersistentVolume,
"Failed to create volume %s on any of the VCs", volumeHandle)
}

func generateEventOnPv(ctx context.Context, pv *v1.PersistentVolume, failureReason string, errorMsg string) {
func generateEventOnPv(ctx context.Context, pv *v1.PersistentVolume,
eventType string, failureReason string, errorMsg string) {
log := logger.GetLogger(ctx)

eventBroadcaster := record.NewBroadcaster()
Expand All @@ -722,7 +727,7 @@ func generateEventOnPv(ctx context.Context, pv *v1.PersistentVolume, failureReas

eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: syncerComponent})
eventRecorder.Event(pv, v1.EventTypeWarning, failureReason, errorMsg)
eventRecorder.Event(pv, eventType, failureReason, errorMsg)
}

func createCnsVolume(ctx context.Context, pv *v1.PersistentVolume,
Expand Down Expand Up @@ -851,9 +856,23 @@ func createMissingFileVolumeInfoCrs(ctx context.Context, metadataSyncer *metadat
string(cnstypes.CnsKubernetesEntityTypePV), "", clusterIDforVolumeMetadata, nil)
metadataList = append(metadataList, cnstypes.BaseCnsEntityMetadata(pvMetadata))

_, _, _ = createVolumeOnMultiVc(ctx, pv, metadataSyncer,
_, _, err = createVolumeOnMultiVc(ctx, pv, metadataSyncer,
common.FileVolumeType, metadataList, pv.Spec.CSI.VolumeHandle)
if err == nil {
if !isDynamicallyCreatedVolume(ctx, pv) {
generateEventOnPv(ctx, pv, v1.EventTypeNormal,
staticVolumeProvisioningSuccessReason, staticVolumeProvisioningSuccessMessage)
}
}
}
}

func isDynamicallyCreatedVolume(ctx context.Context, pv *v1.PersistentVolume) bool {
isdynamicCSIPV := false
if pv.Spec.CSI != nil {
_, isdynamicCSIPV = pv.Spec.CSI.VolumeAttributes[attribCSIProvisionerID]
}
return isdynamicCSIPV
}

func getPatchData(oldObj, newObj interface{}) ([]byte, error) {
Expand Down