Skip to content

Commit

Permalink
fix: ensure workflowtaskresults complete before mark workflow complet…
Browse files Browse the repository at this point in the history
…ed status. Fixes: argoproj#12615 (argoproj#12574)

Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun committed Mar 10, 2024
1 parent 158a981 commit ebce8ef
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 34 deletions.
32 changes: 32 additions & 0 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,38 @@ spec:

}

// A test can simply reproduce the problem mentioned in the link https://github.com/argoproj/argo-workflows/pull/12574
// First, add the code to func "taskResultReconciliation".You can adjust this time to be larger for better reproduction.
//
// if !woc.checkTaskResultsInProgress() {
// time.Sleep(time.Second * 2)
// }
//
// Second, run the test.
// Finally, you will get a workflow in Running status but its labelCompleted is true.
func (s *ArgoServerSuite) TestRetryStoppedButIncompleteWorkflow() {
var workflowName string
s.Given().
Workflow(`@testdata/retry-on-stopped.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
})

time.Sleep(1 * time.Second)
s.Run("Retry", func() {
s.e().PUT("/api/v1/workflows/argo/{workflowName}/retry", workflowName).
Expect().
Status(200).
JSON().
Path("$.metadata.name").
NotNull()
})
}

func (s *ArgoServerSuite) TestWorkflowTemplateService() {
s.Run("Lint", func() {
s.e().POST("/api/v1/workflow-templates/argo/lint").
Expand Down
65 changes: 65 additions & 0 deletions test/e2e/testdata/retry-on-stopped.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: wf-retry-stopped-
spec:
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflows.argoproj.io/workflow: "wf-retry-stopped"
entrypoint: wf-retry-stopped-main
serviceAccountName: argo
executor:
serviceAccountName: default
templates:
- name: wf-retry-stopped-main
steps:
- - name: create
template: create
- name: sleep
template: sleep
- name: stop
template: stop

- name: sleep
container:
image: alpine:latest
command: [ sleep, "10" ]

- name: stop
container:
image: quay.io/argoproj/argocli:latest
args:
- stop
- -l
- workflows.argoproj.io/workflow=wf-retry-stopped
- --namespace=argo
- --loglevel=debug

- name: create
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "hello world" > /tmp/message
sleep 999
outputs:
artifacts:
- name: my-artifact
path: /tmp/message
s3:
key: my-artifact
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
archive:
none: {}
74 changes: 40 additions & 34 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
defer argoruntime.RecoverFromPanic(woc.log)

defer func() {
if woc.wf.Status.Fulfilled() {
woc.killDaemonedChildren("")
}
woc.persistUpdates(ctx)
}()
defer func() {
Expand Down Expand Up @@ -240,7 +237,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.taskResultReconciliation()

// Do artifact GC if task result reconciliation is complete.
if woc.checkReconciliationComplete() {
if woc.wf.Status.Fulfilled() {
if err := woc.garbageCollectArtifacts(ctx); err != nil {
woc.log.WithError(err).Error("failed to GC artifacts")
return
Expand Down Expand Up @@ -513,6 +510,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.markWorkflowError(ctx, err)
}

if !woc.wf.Status.Fulfilled() {
return
}

if woc.execWf.Spec.Metrics != nil {
woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus)
localScope, realTimeScope := woc.prepareMetricScope(node)
Expand Down Expand Up @@ -799,8 +800,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.log.WithError(err).Warn("error updating taskset")
}

// Make sure the TaskResults are incorporated into WorkflowStatus before we delete them.
if woc.checkReconciliationComplete() {
// Make sure the workflow completed.
if woc.wf.Status.Fulfilled() {
if err := woc.deleteTaskResults(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete task-results")
}
Expand All @@ -812,9 +813,9 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.queuePodsForCleanup()
}

func (woc *wfOperationCtx) checkReconciliationComplete() bool {
func (woc *wfOperationCtx) checkTaskResultsInProgress() bool {
woc.log.Debugf("Task results completion status: %v", woc.wf.Status.TaskResultsCompletionStatus)
return woc.wf.Status.Phase.Completed() && !woc.wf.Status.TaskResultsInProgress()
return woc.wf.Status.TaskResultsInProgress()
}

func (woc *wfOperationCtx) deleteTaskResults(ctx context.Context) error {
Expand Down Expand Up @@ -1704,7 +1705,7 @@ func (woc *wfOperationCtx) deletePVCs(ctx context.Context) error {

switch gcStrategy {
case wfv1.VolumeClaimGCOnSuccess:
if woc.wf.Status.Phase == wfv1.WorkflowError || woc.wf.Status.Phase == wfv1.WorkflowFailed {
if woc.wf.Status.Phase != wfv1.WorkflowSucceeded {
// Skip deleting PVCs to reuse them for retried failed/error workflows.
// PVCs are automatically deleted when corresponded owner workflows get deleted.
return nil
Expand Down Expand Up @@ -2289,6 +2290,14 @@ func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.
// markWorkflowPhase is a convenience method to set the phase of the workflow with optional message
// optionally marks the workflow completed, which sets the finishedAt timestamp and completed label
func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.WorkflowPhase, message string) {
// Check whether or not the workflow needs to continue processing when it is completed
if phase.Completed() && (woc.checkTaskResultsInProgress() || woc.hasDaemonNodes()) {
woc.log.WithFields(log.Fields{"fromPhase": woc.wf.Status.Phase, "toPhase": phase}).
Debug("taskresults of workflow are incomplete or still have daemon nodes, so can't mark workflow completed")
woc.killDaemonedChildren("")
return
}

if woc.wf.Status.Phase != phase {
if woc.wf.Status.Fulfilled() {
woc.log.WithFields(log.Fields{"fromPhase": woc.wf.Status.Phase, "toPhase": phase}).
Expand Down Expand Up @@ -2339,33 +2348,30 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor

switch phase {
case wfv1.WorkflowSucceeded, wfv1.WorkflowFailed, wfv1.WorkflowError:
// Make sure all task results have been reconciled and wait for all daemon nodes to get terminated before marking the workflow completed.
if woc.checkReconciliationComplete() && !woc.hasDaemonNodes() {
woc.log.Info("Marking workflow completed")
woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()}
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", woc.wf.Status.FinishedAt.Sub(woc.wf.Status.StartedAt.Time).Seconds())
if woc.wf.ObjectMeta.Labels == nil {
woc.wf.ObjectMeta.Labels = make(map[string]string)
}
woc.wf.ObjectMeta.Labels[common.LabelKeyCompleted] = "true"
woc.wf.Status.Conditions.UpsertCondition(wfv1.Condition{Status: metav1.ConditionTrue, Type: wfv1.ConditionTypeCompleted})
err := woc.deletePDBResource(ctx)
if err != nil {
woc.wf.Status.Phase = wfv1.WorkflowError
woc.wf.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeError)
woc.updated = true
woc.wf.Status.Message = err.Error()
}
if woc.controller.wfArchive.IsEnabled() {
if woc.controller.isArchivable(woc.wf) {
woc.log.Info("Marking workflow as pending archiving")
woc.wf.Labels[common.LabelKeyWorkflowArchivingStatus] = "Pending"
} else {
woc.log.Info("Doesn't match with archive label selector. Skipping Archive")
}
}
woc.log.Info("Marking workflow completed")
woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()}
woc.globalParams[common.GlobalVarWorkflowDuration] = fmt.Sprintf("%f", woc.wf.Status.FinishedAt.Sub(woc.wf.Status.StartedAt.Time).Seconds())
if woc.wf.ObjectMeta.Labels == nil {
woc.wf.ObjectMeta.Labels = make(map[string]string)
}
woc.wf.ObjectMeta.Labels[common.LabelKeyCompleted] = "true"
woc.wf.Status.Conditions.UpsertCondition(wfv1.Condition{Status: metav1.ConditionTrue, Type: wfv1.ConditionTypeCompleted})
err := woc.deletePDBResource(ctx)
if err != nil {
woc.wf.Status.Phase = wfv1.WorkflowError
woc.wf.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeError)
woc.updated = true
woc.wf.Status.Message = err.Error()
}
if woc.controller.wfArchive.IsEnabled() {
if woc.controller.isArchivable(woc.wf) {
woc.log.Info("Marking workflow as pending archiving")
woc.wf.Labels[common.LabelKeyWorkflowArchivingStatus] = "Pending"
} else {
woc.log.Info("Doesn't match with archive label selector. Skipping Archive")
}
}
woc.updated = true
woc.controller.queuePodForCleanup(woc.wf.Namespace, woc.getAgentPodName(), deletePod)
}
}
Expand Down
Loading

0 comments on commit ebce8ef

Please sign in to comment.