Skip to content

Commit

Permalink
Feat: add recycle grouped workflow run with cron
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Mar 2, 2023
1 parent 8eae143 commit d6d728a
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 16 deletions.
18 changes: 9 additions & 9 deletions charts/vela-workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow -

### KubeVela workflow parameters

| Name | Description | Value |
| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `workflow.enableSuspendOnFailure` | Enable the capability of suspend an failed workflow automatically | `false` |
| `workflow.enablePatchStatusAtOnce` | Enable the capability of patch status at once | `false` |
| `workflow.enableWatchEventListener` | Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature | `false` |
| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` |
| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` |
| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` |
| Name | Description | Value |
| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `workflow.enableSuspendOnFailure` | Enable the capability of suspend an failed workflow automatically | `false` |
| `workflow.enablePatchStatusAtOnce` | Enable the capability of patch status at once | `false` |
| `workflow.enableWatchEventListener` | Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature | `false` |
| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` |
| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` |
| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` |
| `workflow.groupByLabel` | The label used to group workflow record | `pipeline.oam.dev/name` |


### KubeVela workflow backup parameters
Expand All @@ -56,7 +57,6 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow -
| `backup.strategy` | The backup strategy for workflow record | `BackupFinishedRecord` |
| `backup.ignoreStrategy` | The ignore strategy for backup | `IgnoreLatestFailedRecord` |
| `backup.cleanOnBackup` | Enable auto clean after backup workflow record | `false` |
| `backup.groupByLabel` | The label used to group workflow record | `""` |
| `backup.persistType` | The persist type for workflow record | `""` |
| `backup.configSecretName` | The secret name of backup config | `backup-config` |
| `backup.configSecretNamespace` | The secret name of backup config namespace | `vela-system` |
Expand Down
2 changes: 1 addition & 1 deletion charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ spec:
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.workflow.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
- "--group-by-label={{ .Values.workflow.groupByLabel }}"
{{ if .Values.backup.enable }}
- "--backup-strategy={{ .Values.backup.strategy }}"
- "--backup-ignore-strategy={{ .Values.backup.ignoreStrategy }}"
- "--backup-group-by-label={{ .Values.backup.groupByLabel }}"
- "--backup-clean-on-backup={{ .Values.backup.cleanOnBackup }}"
- "--backup-persist-type={{ .Values.backup.persisType }}"
- "--backup-config-secret-name={{ .Values.backup.configSecretName }}"
Expand Down
4 changes: 2 additions & 2 deletions charts/vela-workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ignoreWorkflowWithoutControllerRequirement: false
## @param workflow.backoff.maxTime.waitState The max backoff time of workflow in a wait condition
## @param workflow.backoff.maxTime.failedState The max backoff time of workflow in a failed condition
## @param workflow.step.errorRetryTimes The max retry times of a failed workflow step
## @param workflow.groupByLabel The label used to group workflow record
workflow:
enableSuspendOnFailure: false
enablePatchStatusAtOnce: false
Expand All @@ -30,14 +31,14 @@ workflow:
failedState: 300
step:
errorRetryTimes: 10
groupByLabel: "pipeline.oam.dev/name"

## @section KubeVela workflow backup parameters

## @param backup.enabled Enable backup workflow record
## @param backup.strategy The backup strategy for workflow record
## @param backup.ignoreStrategy The ignore strategy for backup
## @param backup.cleanOnBackup Enable auto clean after backup workflow record
## @param backup.groupByLabel The label used to group workflow record
## @param backup.persistType The persist type for workflow record
## @param backup.configSecretName The secret name of backup config
## @param backup.configSecretNamespace The secret name of backup config namespace
Expand All @@ -46,7 +47,6 @@ backup:
strategy: BackupFinishedRecord
ignoreStrategy: IgnoreLatestFailedRecord
cleanOnBackup: false
groupByLabel: ""
persistType: ""
configSecretName: "backup-config"
configSecretNamespace: "vela-system"
Expand Down
18 changes: 14 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/monitor/watcher"
"github.com/kubevela/workflow/pkg/types"
"github.com/kubevela/workflow/pkg/utils"
"github.com/kubevela/workflow/pkg/webhook"
"github.com/kubevela/workflow/version"
//+kubebuilder:scaffold:imports
Expand All @@ -82,7 +83,7 @@ func main() {
var qps float64
var logFileMaxSize uint64
var burst, webhookPort int
var leaseDuration, renewDeadline, retryPeriod time.Duration
var leaseDuration, renewDeadline, retryPeriod, recycleDuration time.Duration
var controllerArgs controllers.Args

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -100,6 +101,9 @@ func main() {
"The duration that the acting controlplane will retry refreshing leadership before giving up")
flag.DurationVar(&retryPeriod, "leader-election-retry-period", 2*time.Second,
"The duration the LeaderElector clients should wait between tries of actions")
flag.DurationVar(&recycleDuration, "recycle-duration", 30*24*time.Hour,
"The recycle duration of a completed and is not the latest record in a set of workflowruns")

flag.BoolVar(&useWebhook, "use-webhook", false, "Enable Admission Webhook")
flag.StringVar(&certDir, "webhook-cert-dir", "/k8s-webhook-server/serving-certs", "Admission webhook cert/key dir.")
flag.IntVar(&webhookPort, "webhook-port", 9443, "admission webhook listen address")
Expand All @@ -115,7 +119,7 @@ func main() {
flag.StringVar(&backupStrategy, "backup-strategy", "BackupFinishedRecord", "Set the strategy for backup workflow records, default is RemainLatestFailedRecord")
flag.StringVar(&backupIgnoreStrategy, "backup-ignore-strategy", "", "Set the strategy for ignore backup workflow records, default is IgnoreLatestFailedRecord")
flag.StringVar(&backupPersistType, "backup-persist-type", "", "Set the persist type for backup workflow records, default is empty")
flag.StringVar(&groupByLabel, "backup-group-by-label", "", "Set the label for group by, default is empty")
flag.StringVar(&groupByLabel, "group-by-label", "pipeline.oam.dev/name", "Set the label for group by, default is pipeline.oam.dev/name")
flag.BoolVar(&backupCleanOnBackup, "backup-clean-on-backup", false, "Set the auto clean for backup workflow records, default is false")
flag.StringVar(&backupConfigSecretName, "backup-config-secret-name", "backup-config", "Set the secret name for backup workflow configs, default is backup-config")
flag.StringVar(&backupConfigSecretNamespace, "backup-config-secret-namespace", "vela-system", "Set the secret namespace for backup workflow configs, default is backup-config")
Expand Down Expand Up @@ -210,6 +214,14 @@ func main() {
os.Exit(1)
}

kubeClient := mgr.GetClient()
if groupByLabel != "" {
if err := mgr.Add(utils.NewRecycleCronJob(kubeClient, recycleDuration, "0 0 * * *", groupByLabel)); err != nil {
klog.Error(err, "unable to start recycle cronjob")
os.Exit(1)
}
}

pd, err := packages.NewPackageDiscover(mgr.GetConfig())
if err != nil {
klog.Error(err, "Failed to create CRD discovery for CUE package client")
Expand All @@ -228,8 +240,6 @@ func main() {
}
}

kubeClient := mgr.GetClient()

if err = (&controllers.WorkflowRunReconciler{
Client: kubeClient,
Scheme: mgr.GetScheme(),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/onsi/gomega v1.20.2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uY
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8=
github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
123 changes: 123 additions & 0 deletions pkg/utils/recycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2023 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"context"
"sort"
"time"

"github.com/kubevela/workflow/api/v1alpha1"
"github.com/robfig/cron/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// recycleCronJob is the cron job to clean the completed workflow
type recycleCronJob struct {
cli client.Client
duration time.Duration
cron string
label string
}

// NewRecycleCronJob returns a new recycleCronJob
func NewRecycleCronJob(cli client.Client, duration time.Duration, cron, label string) manager.Runnable {
return &recycleCronJob{
cli: cli,
duration: duration,
label: label,
cron: cron,
}
}

func (r *recycleCronJob) Start(ctx context.Context) error {
c, err := r.start(ctx)
if err != nil {
return err
}
defer c.Stop()
<-ctx.Done()
return nil
}

func (r *recycleCronJob) start(ctx context.Context) (*cron.Cron, error) {
c := cron.New(cron.WithChain(
cron.Recover(cron.DefaultLogger),
))
if _, err := c.AddFunc(r.cron, func() {
err := retry.OnError(wait.Backoff{
Steps: 3,
Duration: 1 * time.Minute,
Factor: 5.0,
Jitter: 0.1,
}, func(err error) bool {
// always retry
return true
}, func() error {
if err := r.run(ctx); err != nil {
klog.Errorf("Failed to recycle workflow run: %v", err)
return err
}
klog.Info("Recycle workflow run successfully")
return nil
})
if err != nil {
klog.Errorf("Failed to recycle workflow runs after 3 tries: %v", err)
}
}); err != nil {
return nil, err
}
c.Start()
return c, nil
}

func (r *recycleCronJob) run(ctx context.Context) error {
runs := &v1alpha1.WorkflowRunList{}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{{Key: r.label, Operator: metav1.LabelSelectorOpExists}}})
if err != nil {
return err
}
listOpt := &client.ListOptions{LabelSelector: selector}
if err := r.cli.List(ctx, runs, listOpt); err != nil {
return err
}
sort.Sort(runs)
items := make(map[string][]v1alpha1.WorkflowRun)
for _, item := range runs.Items {
if v, ok := item.Labels[r.label]; ok {
items[v] = append(items[v], item)
}
}
for _, l := range items {
for i := 1; i < len(l); i++ {
item := l[i]
if item.Status.Finished && time.Since(item.Status.EndTime.Time) > r.duration {
if err := r.cli.Delete(ctx, &item); err != nil {
klog.Errorf("Failed to delete workflowRun %s/%s, error: %v", item.Namespace, item.Name, err)
}
klog.Info("Successfully recycled completed workflowRun %s/%s", item.Namespace, item.Name)
}
}
}
return nil
}
63 changes: 63 additions & 0 deletions pkg/utils/recycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2023 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubevela/workflow/api/v1alpha1"
)

func TestRecycleCronJob(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r := require.New(t)
for i := 1; i < 7; i++ {
run := &v1alpha1.WorkflowRun{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("workflow-test-%d", i),
Namespace: "default",
},
}
if i%5 != 0 {
run.Labels = map[string]string{
"pipeline.oam.dev/name": "test",
}
}
err := cli.Create(ctx, run)
r.NoError(err)
run.Status.Finished = i%6 != 0
run.Status.EndTime = metav1.Time{Time: time.Now().AddDate(0, 0, -i)}
err = cli.Status().Update(ctx, run)
r.NoError(err)
defer cli.Delete(ctx, run)
}

runner := NewRecycleCronJob(cli, time.Hour, "@every 1s", "pipeline.oam.dev/name")
err := runner.Start(ctx)
r.NoError(err)
runs := &v1alpha1.WorkflowRunList{}
err = cli.List(ctx, runs)
r.NoError(err)
r.Equal(3, len(runs.Items))
}

0 comments on commit d6d728a

Please sign in to comment.