Skip to content

Commit

Permalink
Handle errors from CNS in idempotency workflow: (#1284)
Browse files Browse the repository at this point in the history
- If CreateVolume task is deleted from vCenter, then query CNS.
- If CreateVolume task fails, retry.
- If ExtendVolume task is deleted from vCenter, then query CNS.
- If ExtendVolume task returns a CnsFault, then query CNS.
  • Loading branch information
RaunakShah committed Oct 8, 2021
1 parent 22f9828 commit 22a00a3
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 32 deletions.
118 changes: 95 additions & 23 deletions pkg/common/cns-lib/volume/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (m *defaultManager) createVolumeWithImprovedIdempotency(ctx context.Context
if !isStaticallyProvisioned(spec) {
// Persist task details only for dynamically provisioned volumes.
volumeOperationDetails = createRequestDetails(volNameFromInputSpec, "", "", 0,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp,
task.Reference().Value, "", taskInvocationStatusInProgress, "")
metav1.Now(), task.Reference().Value, "", taskInvocationStatusInProgress,
"")
err = m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails)
if err != nil {
// Don't return if CreateVolume details can't be stored.
Expand All @@ -347,16 +347,60 @@ func (m *defaultManager) createVolumeWithImprovedIdempotency(ctx context.Context
}
}

// Get the taskInfo.
taskInfo, err := cns.GetTaskInfo(ctx, task)
if err != nil || taskInfo == nil {
log.Errorf("failed to get taskInfo for CreateVolume task with err: %v", err)
if err != nil {
faultType = ExtractFaultTypeFromErr(ctx, err)
} else {
faultType = csifault.CSITaskInfoEmptyFault
taskInfo, err := task.WaitForResult(ctx, nil)
if err != nil {
if cnsvsphere.IsManagedObjectNotFound(err, task.Reference()) {
log.Debugf("CreateVolume task %s not found in vCenter. Querying CNS "+
"to determine if the volume %s was successfully created.",
task.Reference().Value, volNameFromInputSpec)
queryFilter := cnstypes.CnsQueryFilter{
Names: []string{volNameFromInputSpec},
ContainerClusterIds: []string{spec.Metadata.ContainerClusterArray[0].ClusterId},
}
queryResult, queryAllVolumeErr := m.QueryAllVolume(ctx, queryFilter, cnstypes.CnsQuerySelection{})
if queryAllVolumeErr != nil {
log.Debugf("failed to query CNS for volume %s with error: %v. Cannot "+
"determine if CreateVolume task %s was successful.", volNameFromInputSpec,
queryAllVolumeErr, task.Reference().Value)
return nil, ExtractFaultTypeFromErr(ctx, err), err
}
if len(queryResult.Volumes) > 0 {
if len(queryResult.Volumes) != 1 {
log.Infof("CNS Query returned multiple entries for volume %s: %v. Returning volume ID "+
"of first entry: %s", volNameFromInputSpec, spew.Sdump(queryResult.Volumes),
queryResult.Volumes[0].VolumeId.Id)
}
volumeID := queryResult.Volumes[0].VolumeId.Id
volumeOperationDetails = createRequestDetails(volNameFromInputSpec, volumeID, "", 0,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
volumeOperationDetails.OperationDetails.TaskID, taskInvocationStatusSuccess, "")
log.Debugf("Found volume %s with id %s", volNameFromInputSpec, volumeID)
return &CnsVolumeInfo{
DatastoreURL: "",
VolumeID: cnstypes.CnsVolumeId{
Id: volumeID,
},
}, "", nil
}
log.Errorf("volume with name %s not present in CNS. Marking task %s as failed.",
volNameFromInputSpec, task.Reference().Value)
volumeOperationDetails = createRequestDetails(volNameFromInputSpec, "", "", 0,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
volumeOperationDetails.OperationDetails.TaskID, taskInvocationStatusError, err.Error())

return nil, ExtractFaultTypeFromErr(ctx, err), err
}
return nil, faultType, err
// WaitForResult can fail for many reasons, including:
// - CNS restarted and marked "InProgress" tasks as "Failed".
// - Any failures from CNS.
// - Any failures at lower layers like FCD and SPS.
// In all cases, mark task as failed and retry.
log.Errorf("failed to get CreateVolume taskInfo from CNS with error: %v", err)
volumeOperationDetails = createRequestDetails(volNameFromInputSpec, "", "", 0,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
volumeOperationDetails.OperationDetails.TaskID, taskInvocationStatusError, err.Error())

return nil, ExtractFaultTypeFromErr(ctx, err), err
}

log.Infof("CreateVolume: VolumeName: %q, opId: %q", volNameFromInputSpec, taskInfo.ActivationId)
Expand Down Expand Up @@ -1107,7 +1151,7 @@ func (m *defaultManager) ExpandVolume(ctx context.Context, volumeID string, size
return faultType, err
}

// createVolumeWithoutIdempotency invokes CNS ExpandVolume.
// expandVolume invokes CNS ExpandVolume.
func (m *defaultManager) expandVolume(ctx context.Context, volumeID string, size int64) (string, error) {
log := logger.GetLogger(ctx)
// Construct the CNS ExtendSpec list.
Expand Down Expand Up @@ -1263,18 +1307,33 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context
}
}

// Get the taskInfo.
taskInfo, err := cns.GetTaskInfo(ctx, task)
if err != nil || taskInfo == nil {
log.Errorf("failed to get taskInfo for ExtendVolume task from vCenter %q with err: %v",
m.virtualCenter.Config.Host, err)
if err != nil {
faultType = ExtractFaultTypeFromErr(ctx, err)
} else {
faultType = csifault.CSITaskInfoEmptyFault
taskInfo, err := task.WaitForResult(ctx, nil)
if err != nil {
if cnsvsphere.IsManagedObjectNotFound(err, task.Reference()) {
log.Debugf("ExtendVolume task %s not found in vCenter. Querying CNS "+
"to determine if volume with ID %s was successfully expanded.",
task.Reference().Value, volumeID)
if validateVolumeCapacity(ctx, m, volumeID, size) {
log.Infof("ExpandVolume: Volume expanded successfully to size %d. volumeID: %q, task: %q",
volumeOperationDetails.Capacity, volumeID, task.Reference().Value)
volumeOperationDetails = createRequestDetails(instanceName, "", "", volumeOperationDetails.Capacity,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
volumeOperationDetails.OperationDetails.TaskID, taskInvocationStatusSuccess, "")
return "", nil
}
}
return faultType, err
// WaitForResult can fail for many reasons, including:
// - CNS restarted and marked "InProgress" tasks as "Failed".
// - Any other CNS failures.
// - Any other failures at lower layers like FCD and SPS.
// In all cases, mark task as failed and retry.
log.Errorf("failed to expand volume with ID %s with error %+v", volumeID, err)
volumeOperationDetails = createRequestDetails(instanceName, "", "", volumeOperationDetails.Capacity,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
volumeOperationDetails.OperationDetails.TaskID, taskInvocationStatusError, err.Error())
return ExtractFaultTypeFromErr(ctx, err), err
}

log.Infof("ExpandVolume: volumeID: %q, opId: %q", volumeID, taskInfo.ActivationId)
// Get the task results for the given task.
taskResult, err := getTaskResultFromTaskInfo(ctx, taskInfo)
Expand All @@ -1292,10 +1351,23 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context

volumeOperationRes := taskResult.GetCnsVolumeOperationResult()
if volumeOperationRes.Fault != nil {
if _, ok := volumeOperationRes.Fault.Fault.(cnstypes.CnsFault); ok {
log.Debugf("ExtendVolume task %s returned with CnsFault. Querying CNS to "+
"determine if volume with ID %s was successfully expanded.",
task.Reference().Value, volumeID)
if validateVolumeCapacity(ctx, m, volumeID, size) {
log.Infof("ExpandVolume: Volume expanded successfully to size %d. volumeID: %q, task: %q",
volumeOperationDetails.Capacity, volumeID, task.Reference().Value)
volumeOperationDetails = createRequestDetails(instanceName, "", "", volumeOperationDetails.Capacity,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
volumeOperationDetails.OperationDetails.TaskID, taskInvocationStatusSuccess, "")
return "", nil
}
}
faultType = ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationRes)
volumeOperationDetails = createRequestDetails(instanceName, "", "", volumeOperationDetails.Capacity,
volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, task.Reference().Value,
taskInfo.ActivationId, taskInvocationStatusError, err.Error())
taskInfo.ActivationId, taskInvocationStatusError, volumeOperationRes.Fault.LocalizedMessage)
return faultType, logger.LogNewErrorf(log, "failed to extend volume: %q, fault: %q, opID: %q",
volumeID, spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId)
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/common/cns-lib/volume/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,11 @@ func getTaskResultFromTaskInfo(ctx context.Context, taskInfo *types.TaskInfo) (c
log := logger.GetLogger(ctx)
// Get the taskResult.
taskResult, err := cns.GetTaskResult(ctx, taskInfo)

if err != nil {
log.Errorf("failed to get task result for task with ID: %q, opId: %q result: %+v",
taskInfo.Task.Value, taskInfo.ActivationId, taskResult)
log.Errorf("failed to get task result for task with ID: %q, opId: %q error: %+v",
taskInfo.Task.Value, taskInfo.ActivationId, err)
return nil, err
}

if taskResult == nil {
return nil, logger.LogNewErrorf(log, "taskResult is empty for task: %q", taskInfo.ActivationId)
}
return taskResult, nil
}

Expand Down Expand Up @@ -441,3 +436,26 @@ func getPendingCreateSnapshotTaskFromMap(ctx context.Context, snapshotName strin
}
return task
}

// validateVolumeCapacity queries the CNS volume and validates the returned size with
// input size.
// Returns true if the volume capacity is greater than or equal to the input size.
func validateVolumeCapacity(ctx context.Context, m *defaultManager, volumeID string, size int64) bool {
log := logger.GetLogger(ctx)
queryFilter := cnstypes.CnsQueryFilter{
VolumeIds: []cnstypes.CnsVolumeId{{Id: volumeID}},
}
querySelection := cnstypes.CnsQuerySelection{
Names: []string{
string(cnstypes.QuerySelectionNameTypeBackingObjectDetails),
},
}
queryResult, queryAllVolumeErr := m.QueryAllVolume(ctx, queryFilter, querySelection)
if queryAllVolumeErr != nil {
log.Debugf("failed to query CNS for volume %s with error: %v. Cannot "+
"determine volume capacity.", volumeID, queryAllVolumeErr)
return false
}
return len(queryResult.Volumes) > 0 &&
queryResult.Volumes[0].BackingObjectDetails.GetCnsBackingObjectDetails().CapacityInMb >= size
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int) {
trimmedName = instance.Name
case strings.HasPrefix(instance.Name, "delete"):
trimmedName = strings.TrimPrefix(instance.Name, "delete-")
case strings.HasPrefix(instance.Name, "extend"):
trimmedName = strings.TrimPrefix(instance.Name, "extend-")
case strings.HasPrefix(instance.Name, "expand"):
trimmedName = strings.TrimPrefix(instance.Name, "expand-")
}
if _, ok := instanceMap[trimmedName]; !ok {
err = or.deleteRequestDetails(ctx, instance.Name)
Expand Down

0 comments on commit 22a00a3

Please sign in to comment.