From 0f9dd8a368445ae9a0bfc395e59b1dd15720dd2e Mon Sep 17 00:00:00 2001 From: shuangkun Date: Tue, 14 May 2024 19:36:26 +0800 Subject: [PATCH] fix: prevent workflow taskresult from being deleted before completion. Fixes:#12993 Signed-off-by: shuangkun --- util/util.go | 10 +++++++++ workflow/common/common.go | 3 +++ workflow/controller/controller.go | 13 ++--------- workflow/executor/executor.go | 36 ++++++++++++++++++++++++++++++- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/util/util.go b/util/util.go index d68a7ed293a6..2ddb6bb89613 100644 --- a/util/util.go +++ b/util/util.go @@ -132,3 +132,13 @@ func GetDeletePropagation() *metav1.DeletionPropagation { } return &propagationPolicy } + +func RemoveFinalizer(finalizers []string, targetFinalizer string) []string { + var updatedFinalizers []string + for _, finalizer := range finalizers { + if finalizer != targetFinalizer { + updatedFinalizers = append(updatedFinalizers, finalizer) + } + } + return updatedFinalizers +} diff --git a/workflow/common/common.go b/workflow/common/common.go index b4f174263eaa..f789c86ac60b 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -174,6 +174,9 @@ const ( // Finalizer blocks the deletion of pods until the controller captures their status. FinalizerPodStatus = workflow.WorkflowFullName + "/status" + // Finalizer blocks the deletion of pods until the taskresult marked completed. + FinalizerTaskResultStatus = workflow.WorkflowFullName + "/taskresult" + // Variables that are added to the scope during template execution and can be referenced using {{}} syntax // GlobalVarWorkflowName is a global workflow variable referencing the workflow's metadata.name field diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index e738f5f64504..056dd730643e 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -42,6 +42,7 @@ import ( "github.com/argoproj/argo-workflows/v3" "github.com/argoproj/argo-workflows/v3/config" argoErr "github.com/argoproj/argo-workflows/v3/errors" + argoUtil "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" @@ -622,7 +623,7 @@ func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods ty updatedPod.Labels[common.LabelKeyCompleted] = "true" } - updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus) + updatedPod.Finalizers = argoUtil.RemoveFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus) _, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{}) return err @@ -633,16 +634,6 @@ func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods ty return nil } -func removeFinalizer(finalizers []string, targetFinalizer string) []string { - var updatedFinalizers []string - for _, finalizer := range finalizers { - if finalizer != targetFinalizer { - updatedFinalizers = append(updatedFinalizers, finalizer) - } - } - return updatedFinalizers -} - func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace string, podName string, sig syscall.Signal) (time.Duration, error) { pod, err := wfc.getPodFromCache(namespace, podName) if pod == nil || err != nil { diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 07cfe5d6b32f..5abee2ab691d 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -35,7 +35,7 @@ import ( argoerrs "github.com/argoproj/argo-workflows/v3/errors" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" argoprojv1 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" - "github.com/argoproj/argo-workflows/v3/util" + argoutil "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/archive" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" "github.com/argoproj/argo-workflows/v3/util/retry" @@ -803,6 +803,7 @@ func (we *WorkflowExecutor) FinalizeOutput(ctx context.Context) { // Only added as a backup in case LabelKeyReportOutputsCompleted could not be set err = we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "true") } + err = we.RemoveFinalizer(ctx, common.FinalizerTaskResultStatus) return err }) if err != nil { @@ -824,6 +825,7 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) { // Only added as a backup in case LabelKeyReportOutputsCompleted could not be set err = we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "false") } + err = we.AddFinalizer(ctx, common.FinalizerTaskResultStatus) return err }) if err != nil { @@ -897,6 +899,38 @@ func (we *WorkflowExecutor) AddAnnotation(ctx context.Context, key, value string } +// AddFinalizer adds a Finalizer to the workflow pod +func (we *WorkflowExecutor) AddFinalizer(ctx context.Context, finalizer string) error { + data, err := json.Marshal(map[string]interface{}{"metadata": metav1.ObjectMeta{ + Finalizers: []string { + finalizer, + }, + }}) + if err != nil { + return err + } + _, err = we.ClientSet.CoreV1().Pods(we.Namespace).Patch(ctx, we.PodName, types.MergePatchType, data, metav1.PatchOptions{}) + return err +} + +// RemoveFinalizer remove a Finalizer from the workflow pod +func (we *WorkflowExecutor) RemoveFinalizer(ctx context.Context, finalizer string) error { + err := retryutil.RetryOnConflict(retry.DefaultRetry, func() error { + currentPod, err := we.ClientSet.CoreV1().Pods(we.Namespace).Get(ctx, we.PodName, metav1.GetOptions{}) + if err != nil { + return err + } + updatedPod := currentPod.DeepCopy() + updatedPod.Finalizers = argoutil.RemoveFinalizer(updatedPod.Finalizers, finalizer) + _, err = we.ClientSet.CoreV1().Pods(we.Namespace).Update(ctx, updatedPod, metav1.UpdateOptions{}) + return err + }) + if err != nil { + return err + } + return nil +} + // isTarball returns whether or not the file is a tarball func isTarball(filePath string) (bool, error) { log.Infof("Detecting if %s is a tarball", filePath)