Skip to content
53 changes: 45 additions & 8 deletions pkg/controllers/workapplier/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,52 @@ func (r *Reconciler) processManifests(
work *fleetv1beta1.Work,
expectedAppliedWorkOwnerRef *metav1.OwnerReference,
) {
// TODO: We have to apply the namespace/crd/secret/configmap/pvc first
// then we can process some of the manifests in parallel.
for _, bundle := range bundles {
if bundle.applyOrReportDiffErr != nil {
// Skip a manifest if it has failed pre-processing.
continue
// Process all manifests in parallel.
//
// There are cases where certain groups of manifests should not be processed in parallel with
// each other (e.g., a config map must be applied after its owner namespace is applied);
// to address this situation, manifests are processed in waves: manifests in the same wave are
// processed in parallel, while different waves are processed sequentially.

// As a special case, if the ReportDiff mode is on, all manifests are processed in parallel in
// one wave.
if work.Spec.ApplyStrategy != nil && work.Spec.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff {
doWork := func(piece int) {
if bundles[piece].applyOrReportDiffErr != nil {
// Skip a manifest if it has failed pre-processing.
return
}

r.processOneManifest(ctx, bundles[piece], work, expectedAppliedWorkOwnerRef)
klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundles[piece].manifestObj), "work", klog.KObj(work))
}
r.processOneManifest(ctx, bundle, work, expectedAppliedWorkOwnerRef)
klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundle.manifestObj), "work", klog.KObj(work))

r.parallelizer.ParallelizeUntil(ctx, len(bundles), doWork, "processingManifestsInReportDiffMode")
return
}

// Organize the bundles into different waves of bundles for parallel processing based on their
// GVR information.
processingWaves := organizeBundlesIntoProcessingWaves(bundles, klog.KObj(work))
for idx := range processingWaves {
bundlesInWave := processingWaves[idx].bundles

// TO-DO (chenyu1): evaluate if there is a need to avoid repeated closure
// assignment just for capturing variables.
doWork := func(piece int) {
if bundlesInWave[piece].applyOrReportDiffErr != nil {
// Skip a manifest if it has failed pre-processing.
//
// This added as a sanity check as the organization step normally
// would have already skipped all the manifests with processing failures.
return
}

r.processOneManifest(ctx, bundlesInWave[piece], work, expectedAppliedWorkOwnerRef)
klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundlesInWave[piece].manifestObj), "work", klog.KObj(work))
}

r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, "processingManifests")
}
}

Expand Down
180 changes: 180 additions & 0 deletions pkg/controllers/workapplier/waves.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
Copyright 2025 The KubeFleet Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workapplier

import (
"slices"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)

type waveNumber int

const (
lastWave waveNumber = 999
)

var (
// The default wave number for all known Kubernetes resource type.
//
// Note (chenyu1): the waves below are based on the Helm resource installation
// order (see also the Helm source code). Similar objects are grouped together
// to achieve best performance.
defaultWaveNumberByResourceType = map[string]waveNumber{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also defined the apply order in https://github.com/kubefleet-dev/kubefleet/blob/main/pkg/controllers/placement/resource_selector.go#L47

the orders are not consistent. for example, the ingressclasses.

Can we try to merge them? so that it's always consistent

Copy link
Collaborator Author

@michaelawyu michaelawyu Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Zhiying! Yeah, I was a bit concerned about this too when composing the PR. The reason why we did it separately in this PR was that, if we do not group the resource types but process them invididually, there's a high chance that in each batch there's only 1-2 objects, which kind of defeats the purpose of doing the parallelization.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR also kinds of renders the apply order sorting we did on the hub cluster side redundant (we still need to sort the resources for stability reasons, but they do not have to be done in a specific order -> similar to how the work generator sorts enveloped objects).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this moment I am leaning on grouping the resource types for larger batch sizes, but I do not have a particularly strong opinion on the subject matter -> happy to discuss about the topic further.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the apply order part on the hub cluster, will submit another PR to keep things clean after this one is merged.

Copy link
Collaborator Author

@michaelawyu michaelawyu Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(A side note: another reason I am a bit reluctant to change the apply order on the hub cluster side was that it might trigger a rollout on existing workloads, as it will register as a new resource snapshot IIRC? -> we might need to be a bit careful on this 😞)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two orders serve different purposes. The hub one was the apply order before this PR but now it's more to keep the uniqueness for a list of resources (or we don't think there is any change) while the member side is more on the real apply order. Probably should change the name of the hub side list to reflect the fact that it's no longer the order that member will apply.

// Apply namespaces and priority classes first.
"namespaces": 0,
"priorityclasses": 0,
// Apply policies, configuration data, and other static resources second.
"networkpolicies": 1,
"resourcequotas": 1,
"limitranges": 1,
"podsecuritypolicies": 1,
"poddisruptionbudgets": 1,
"serviceaccounts": 1,
"secrets": 1,
"configmaps": 1,
"storageclasses": 1,
"persistentvolumes": 1,
"persistentvolumeclaims": 1,
"customresourcedefinitions": 1,
"ingressclasses": 1,
// Apply RBAC resources (cluster roles and roles).
"clusterroles": 2,
"roles": 2,
// Apply RBAC resources (cluster role bindings and role bindings).
"clusterrolebindings": 3,
"rolebindings": 3,
// Apply workloads and services.
"services": 4,
"daemonsets": 4,
"pods": 4,
"replicationcontrollers": 4,
"replicasets": 4,
"deployments": 4,
"horizontalpodautoscalers": 4,
"statefulsets": 4,
"jobs": 4,
"cronjobs": 4,
"ingresses": 4,
// Apply API services and webhooks.
"apiservices": 5,
"validatingwebhookconfigurations": 5,
"mutatingwebhookconfigurations": 5,
}

// The API groups for all known Kubernetes resource types.
knownAPIGroups = sets.New(
// The core API group.
"",
// The networking API group (`networking.k8s.io`).
"networking.k8s.io",
// The scheduling API group (`scheduling.k8s.io`).
"scheduling.k8s.io",
// The policy API group (`policy.k8s.io`).
"policy",
// The storage API group (`storage.k8s.io`).
"storage.k8s.io",
// The API extensions API group (`apiextensions.k8s.io`).
"apiextensions.k8s.io",
// The RBAC authorization API group (`rbac.authorization.k8s.io`).
"rbac.authorization.k8s.io",
// The apps API group (`apps`).
"apps",
// The autoscaling API group (`autoscaling`).
"autoscaling",
// The API registration API group (`apiregistration.k8s.io`).
"apiregistration.k8s.io",
// The admission registration API group (`admissionregistration.k8s.io`).
"admissionregistration.k8s.io",
// The batch API group (`batch`).
"batch",
)
)

// bundleProcessingWave is a wave of bundles that can be processed in parallel.
type bundleProcessingWave struct {
num waveNumber
bundles []*manifestProcessingBundle
}

// organizeBundlesIntoProcessingWaves organizes the list of bundles into different
// waves of bundles for parallel processing based on their GVR information.
func organizeBundlesIntoProcessingWaves(bundles []*manifestProcessingBundle, workRef klog.ObjectRef) []*bundleProcessingWave {
// Pre-allocate the map; 7 is the total count of default wave numbers, though
// not all wave numbers might be used.
waveByNum := make(map[waveNumber]*bundleProcessingWave, 7)

Comment on lines +118 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ni: shouldn't this be a bundleProcessingWave array instead? One can precreate all the bundleProcessingWave so there is no need for the getOrAddWave function anymore (but we can keep to save memory). I am not sure why we need to sort the map to produce an array at the end.

getOrAddWave := func(num waveNumber) *bundleProcessingWave {
wave, ok := waveByNum[num]
if !ok {
wave = &bundleProcessingWave{
num: num,
// Pre-allocate with a reasonable size.
bundles: make([]*manifestProcessingBundle, 0, 5),
}
waveByNum[num] = wave
}
return wave
}

// For simplicity reasons, the organization itself runs in sequential order.
// Considering that the categorization itself is quick and the total number of bundles
// should be limited in most cases, this should not introduce significant overhead.
for idx := range bundles {
bundle := bundles[idx]
if bundle.gvr == nil {
// For manifest data that cannot be decoded, there might not be any available GVR
// information. Skip such processing bundles; this is not considered as an error.
klog.V(2).InfoS("Skipping a bundle with no GVR; no wave is assigned",
"ordinal", idx, "work", workRef)
continue
}

if bundle.applyOrReportDiffErr != nil {
// An error has occurred before this step; such bundles need no further processing,
// skip them. This is not considered as an error.
klog.V(2).InfoS("Skipping a bundle with prior processing error; no wave is assigned",
"manifestObj", klog.KObj(bundle.manifestObj), "GVR", *bundle.gvr, "work", workRef)
continue
}

waveNum := lastWave
defaultWaveNum, foundInDefaultWaveNumber := defaultWaveNumberByResourceType[bundle.gvr.Resource]
if foundInDefaultWaveNumber && knownAPIGroups.Has(bundle.gvr.Group) {
// The resource is a known one; assign the bundle to its default wave.
waveNum = defaultWaveNum
}

wave := getOrAddWave(waveNum)
wave.bundles = append(wave.bundles, bundle)
klog.V(2).InfoS("Assigned manifest to a wave",
"waveNumber", waveNum,
"manifestObj", klog.KObj(bundle.manifestObj), "GVR", *bundle.gvr, "work", workRef)
}

// Retrieve all the waves and sort them by their wave number.
waves := make([]*bundleProcessingWave, 0, len(waveByNum))
for _, w := range waveByNum {
waves = append(waves, w)
}
// Sort the waves in ascending order.
slices.SortFunc(waves, func(a, b *bundleProcessingWave) int {
return int(a.num) - int(b.num)
})
return waves
}
Loading
Loading