diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 49f75507e596..dc1ee23d4ec6 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -671,7 +671,8 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate timeToWait := baseDuration if retryStrategy.Backoff.Factor > 0 { // Formula: timeToWait = duration * factor^retry_number - timeToWait = baseDuration * time.Duration(math.Pow(float64(retryStrategy.Backoff.Factor), float64(len(node.Children)))) + // Note that timeToWait should equal to duration for the first retry attempt. + timeToWait = baseDuration * time.Duration(math.Pow(float64(retryStrategy.Backoff.Factor), float64(len(node.Children)-1))) } waitingDeadline := lastChildNode.FinishedAt.Add(timeToWait) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 133a72f71bd4..95e79813df86 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -3,6 +3,8 @@ package controller import ( "encoding/json" "fmt" + "regexp" + "strconv" "strings" "testing" "time" @@ -10,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -361,6 +364,102 @@ func TestProcessNodesWithRetriesWithBackoff(t *testing.T) { assert.Equal(t, n.Phase, wfv1.NodeSucceeded) } +func TestProcessNodesWithRetriesWithExponentialBackoff(t *testing.T) { + require := require.New(t) + + cancel, controller := newController() + defer cancel() + require.NotNil(controller) + wf := unmarshalWF(helloWorldWf) + require.NotNil(wf) + woc := newWorkflowOperationCtx(wf, controller) + require.NotNil(woc) + + // Verify that there are no nodes in the wf status. + require.Zero(len(woc.wf.Status.Nodes)) + + // Add the parent node for retries. + nodeName := "test-node" + nodeID := woc.wf.NodeID(nodeName) + node := woc.initializeNode(nodeName, wfv1.NodeTypeRetry, "", &wfv1.Template{}, "", wfv1.NodeRunning) + retries := wfv1.RetryStrategy{} + retryLimit := int32(2) + retries.Limit = &retryLimit + retries.RetryPolicy = wfv1.RetryPolicyAlways + retries.Backoff = &wfv1.Backoff{ + Duration: "5m", + Factor: 2, + } + woc.wf.Status.Nodes[nodeID] = *node + + require.Equal(wfv1.NodeRunning, node.Phase) + + // Ensure there are no child nodes yet. + lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1) + require.Nil(lastChild) + + woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.Template{}, "", wfv1.NodeFailed) + woc.addChildNode(nodeName, "child-node-1") + + n := woc.wf.GetNodeByName(nodeName) + + // Last child has failed. processNodesWithRetries() should return false due to the default backoff. + var err error + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + require.NoError(err) + require.Equal(wfv1.NodeRunning, n.Phase) + + // First backoff should be between 295 and 300 seconds. + backoff, err := parseRetryMessage(n.Message) + require.NoError(err) + require.LessOrEqual(backoff, 300) + require.Less(295, backoff) + + woc.initializeNode("child-node-2", wfv1.NodeTypePod, "", &wfv1.Template{}, "", wfv1.NodeError) + woc.addChildNode(nodeName, "child-node-2") + n = woc.wf.GetNodeByName(nodeName) + + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + require.NoError(err) + require.Equal(wfv1.NodeRunning, n.Phase) + + // Second backoff should be between 595 and 600 seconds. + backoff, err = parseRetryMessage(n.Message) + require.NoError(err) + require.LessOrEqual(backoff, 600) + require.Less(595, backoff) + + // Mark lastChild as successful. + lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) + require.NotNil(lastChild) + woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + require.NoError(err) + // The parent node also gets marked as Succeeded. + require.Equal(wfv1.NodeSucceeded, n.Phase) +} + +func parseRetryMessage(message string) (int, error) { + pattern := regexp.MustCompile(`Backoff for (\d+) minutes (\d+) seconds`) + matches := pattern.FindStringSubmatch(message) + if len(matches) != 3 { + return 0, fmt.Errorf("unexpected message: %v", message) + } + + minutes, err := strconv.Atoi(matches[1]) + if err != nil { + return 0, err + } + + seconds, err := strconv.Atoi(matches[2]) + if err != nil { + return 0, err + } + + totalSeconds := minutes*60 + seconds + return totalSeconds, nil +} + // TestProcessNodesWithRetries tests retrying when RetryOn.Error is disabled func TestProcessNodesNoRetryWithError(t *testing.T) { cancel, controller := newController() @@ -460,7 +559,7 @@ spec: outputs: {} retryStrategy: backoff: - duration: "1" + duration: "2" factor: 2 maxDuration: 1m limit: 10