Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.4]Feat: watch event listener in controller for a faster reconcile #144

Merged
merged 4 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions charts/vela-workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow -

### KubeVela workflow parameters

| Name | Description | Value |
| -------------------------------------- | ------------------------------------------------------ | ------- |
| `workflow.enableSuspendOnFailure` | Enable suspend on workflow failure | `false` |
| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` |
| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` |
| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` |
| Name | Description | Value |
| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `workflow.enableSuspendOnFailure` | Enable the capability of suspend an failed workflow automatically | `false` |
| `workflow.enablePatchStatusAtOnce` | Enable the capability of patch status at once | `false` |
| `workflow.enableWatchEventListener` | Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature | `false` |
| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` |
| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` |
| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` |


### KubeVela workflow backup parameters
Expand Down
39 changes: 20 additions & 19 deletions charts/vela-workflow/crds/core.oam.dev_workflowruns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,29 +227,30 @@ spec:
type: object
type: array
contextBackend:
description: 'ObjectReference contains enough information to let you
description: "ObjectReference contains enough information to let you
inspect or modify the referred object. --- New uses of this type
are discouraged because of difficulty describing its usage when
embedded in APIs. 1. Ignored fields. It includes many fields which
are not generally honored. For instance, ResourceVersion and FieldPath
are both very rarely valid in actual usage. 2. Invalid usage help. It
is impossible to add specific help for individual usage. In most
embedded usages, there are particular restrictions like, "must refer
only to types A and B" or "UID not honored" or "name must be restricted".
Those cannot be well described when embedded. 3. Inconsistent validation. Because
the usages are different, the validation rules are different by
usage, which makes it hard for users to predict what will happen.
4. The fields are both imprecise and overly precise. Kind is not
a precise mapping to a URL. This can produce ambiguity during interpretation
and require a REST mapping. In most cases, the dependency is on
the group,resource tuple and the version of the actual struct is
irrelevant. 5. We cannot easily change it. Because this type is
embedded in many locations, updates to this type will affect numerous
schemas. Don''t make new APIs embed an underspecified API type
they do not control. Instead of using this type, create a locally
provided and used type that is well-focused on your reference. For
example, ServiceReferences for admission registration: https://github.com/kubernetes/api/blob/release-1.17/admissionregistration/v1/types.go#L533
.'
are both very rarely valid in actual usage. 2. Invalid usage help.
\ It is impossible to add specific help for individual usage. In
most embedded usages, there are particular restrictions like, \"must
refer only to types A and B\" or \"UID not honored\" or \"name must
be restricted\". Those cannot be well described when embedded. 3.
Inconsistent validation. Because the usages are different, the
validation rules are different by usage, which makes it hard for
users to predict what will happen. 4. The fields are both imprecise
and overly precise. Kind is not a precise mapping to a URL. This
can produce ambiguity during interpretation and require a REST mapping.
\ In most cases, the dependency is on the group,resource tuple and
the version of the actual struct is irrelevant. 5. We cannot easily
change it. Because this type is embedded in many locations, updates
to this type will affect numerous schemas. Don't make new APIs
embed an underspecified API type they do not control. \n Instead
of using this type, create a locally provided and used type that
is well-focused on your reference. For example, ServiceReferences
for admission registration: https://github.com/kubernetes/api/blob/release-1.17/admissionregistration/v1/types.go#L533
."
properties:
apiVersion:
description: API version of the referent.
Expand Down
2 changes: 2 additions & 0 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ spec:
- "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}"
- "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}"
- "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}"
- "--feature-gates=EnableWatchEventListener={{- .Values.workflow.enableWatchEventListener | toString -}}"
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.workflow.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
{{ if .Values.backup.enable }}
Expand Down
6 changes: 5 additions & 1 deletion charts/vela-workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ ignoreWorkflowWithoutControllerRequirement: false

## @section KubeVela workflow parameters

## @param workflow.enableSuspendOnFailure Enable suspend on workflow failure
## @param workflow.enableSuspendOnFailure Enable the capability of suspend an failed workflow automatically
## @param workflow.enablePatchStatusAtOnce Enable the capability of patch status at once
## @param workflow.enableWatchEventListener Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature
## @param workflow.backoff.maxTime.waitState The max backoff time of workflow in a wait condition
## @param workflow.backoff.maxTime.failedState The max backoff time of workflow in a failed condition
## @param workflow.step.errorRetryTimes The max retry times of a failed workflow step
workflow:
enableSuspendOnFailure: false
enablePatchStatusAtOnce: false
enableWatchEventListener: false
backoff:
maxTime:
waitState: 60
Expand Down
5 changes: 5 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"

triggerv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1"
velaclient "github.com/kubevela/pkg/controller/client"
"github.com/kubevela/pkg/multicluster"

Expand Down Expand Up @@ -181,6 +182,10 @@ func main() {
)
restConfig.UserAgent = userAgent

if feature.DefaultMutableFeatureGate.Enabled(features.EnableWatchEventListener) {
utilruntime.Must(triggerv1alpha1.AddToScheme(scheme))
}

leaderElectionID := fmt.Sprintf("workflow-%s", strings.ToLower(strings.ReplaceAll(version.VelaVersion, ".", "-")))
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Expand Down
30 changes: 27 additions & 3 deletions controllers/workflowrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,26 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/util/feature"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
ctrlEvent "sigs.k8s.io/controller-runtime/pkg/event"
ctrlHandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

triggerv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1"
monitorContext "github.com/kubevela/pkg/monitor/context"

"github.com/kubevela/workflow/api/condition"
"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/packages"
"github.com/kubevela/workflow/pkg/executor"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/generator"
"github.com/kubevela/workflow/pkg/monitor/metrics"
"github.com/kubevela/workflow/pkg/types"
Expand Down Expand Up @@ -195,16 +202,27 @@ func (r *WorkflowRunReconciler) matchControllerRequirement(wr *v1alpha1.Workflow

// SetupWithManager sets up the controller with the Manager.
func (r *WorkflowRunReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
builder := ctrl.NewControllerManagedBy(mgr)
if feature.DefaultMutableFeatureGate.Enabled(features.EnableWatchEventListener) {
builder = builder.Watches(&source.Kind{
Type: &triggerv1alpha1.EventListener{},
}, ctrlHandler.EnqueueRequestsFromMapFunc(findObjectForEventListener))
}
return builder.
WithOptions(controller.Options{
MaxConcurrentReconciles: r.ConcurrentReconciles,
}).
WithEventFilter(predicate.Funcs{
// filter the changes in workflow status
// let workflow handle its reconcile
UpdateFunc: func(e ctrlEvent.UpdateEvent) bool {
new := e.ObjectNew.DeepCopyObject().(*v1alpha1.WorkflowRun)
old := e.ObjectOld.DeepCopyObject().(*v1alpha1.WorkflowRun)
new, isNewWR := e.ObjectNew.DeepCopyObject().(*v1alpha1.WorkflowRun)
old, isOldWR := e.ObjectOld.DeepCopyObject().(*v1alpha1.WorkflowRun)

// if the object is a event listener, reconcile the controller
if !isNewWR || !isOldWR {
return true
}

// if the workflow is finished, skip the reconcile
if new.Status.Finished {
Expand Down Expand Up @@ -278,3 +296,9 @@ func timeReconcile(wr *v1alpha1.WorkflowRun) func() {
metrics.WorkflowRunReconcileTimeHistogram.WithLabelValues(beginPhase, string(wr.Status.Phase)).Observe(v)
}
}

func findObjectForEventListener(object client.Object) []reconcile.Request {
return []reconcile.Request{{
NamespacedName: k8stypes.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()},
}}
}
Loading