Skip to content
2 changes: 1 addition & 1 deletion cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
// resource processing.
5,
// Use the default worker count (4) for parallelized manifest processing.
parallelizer.DefaultNumOfWorkers,
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
time.Minute*time.Duration(*deletionWaitTime),
*watchWorkWithPriorityQueue,
*watchWorkReconcileAgeMinutes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, 1, time.Minute, false, 60, nil)
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)

propertyProvider1 = &manuallyUpdatedProvider{}
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
Expand All @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, 1, time.Minute, false, 60, nil)
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)

member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
"github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter"
"github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
)

const (
Expand Down Expand Up @@ -227,7 +227,7 @@ type Reconciler struct {
watchWorkReconcileAgeMinutes int
deletionWaitTime time.Duration
joined *atomic.Bool
parallelizer *parallelizer.Parallerlizer
parallelizer parallelizerutil.Parallelizer
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
}

Expand All @@ -237,15 +237,20 @@ func NewReconciler(
spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper,
recorder record.EventRecorder,
concurrentReconciles int,
workerCount int,
parallelizer parallelizerutil.Parallelizer,
deletionWaitTime time.Duration,
watchWorkWithPriorityQueue bool,
watchWorkReconcileAgeMinutes int,
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter,
) *Reconciler {
if requeueRateLimiter == nil {
klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter")
requeueRateLimiter = defaultRequeueRateLimiter
}
if parallelizer == nil {
klog.V(2).InfoS("parallelizer is not set; using the default parallelizer with a worker count of 1")
parallelizer = parallelizerutil.NewParallelizer(1)
}

return &Reconciler{
hubClient: hubClient,
Expand All @@ -254,7 +259,7 @@ func NewReconciler(
restMapper: restMapper,
recorder: recorder,
concurrentReconciles: concurrentReconciles,
parallelizer: parallelizer.NewParallelizer(workerCount),
parallelizer: parallelizer,
watchWorkWithPriorityQueue: watchWorkWithPriorityQueue,
watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes,
workNameSpace: workNameSpace,
Expand Down
100 changes: 51 additions & 49 deletions pkg/controllers/workapplier/controller_integration_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/controllers/workapplier/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *Reconciler) processManifests(
klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundlesInWave[piece].manifestObj), "work", klog.KObj(work))
}

r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, "processingManifests")
r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx))
}
}

Expand Down
123 changes: 120 additions & 3 deletions pkg/controllers/workapplier/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"flag"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -46,6 +48,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
"github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
testv1alpha1 "github.com/kubefleet-dev/kubefleet/test/apis/v1alpha1"
)

Expand All @@ -70,6 +73,13 @@ var (
memberDynamicClient2 dynamic.Interface
workApplier2 *Reconciler

memberCfg3 *rest.Config
memberEnv3 *envtest.Environment
hubMgr3 manager.Manager
memberClient3 client.Client
memberDynamicClient3 dynamic.Interface
workApplier3 *Reconciler

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand All @@ -83,8 +93,33 @@ const (

memberReservedNSName1 = "fleet-member-experimental-1"
memberReservedNSName2 = "fleet-member-experimental-2"
memberReservedNSName3 = "fleet-member-experimental-3"

parallelizerFixedDelay = time.Second * 5
)

// tasks in parallel with a fixed delay after completing each task group.
//
// This is added to help verify the behavior of waved parallel processing in the work applier.
type parallelizerWithFixedDelay struct {
regularParallelizer parallelizer.Parallelizer
delay time.Duration
}

func (p *parallelizerWithFixedDelay) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) {
p.regularParallelizer.ParallelizeUntil(ctx, pieces, doWork, operation)
klog.V(2).InfoS("Parallelization completed, start to wait with a fixed delay", "operation", operation, "delay", p.delay)
// No need to add delay for non-waved operations.
if strings.HasPrefix(operation, "processingManifestsInWave") {
// Only log the delay for operations that are actually related to waves.
klog.V(2).InfoS("Waiting with a fixed delay after processing a wave", "operation", operation, "delay", p.delay)
time.Sleep(p.delay)
}
}

// Verify that parallelizerWithFixedDelay implements the parallelizer.Parallelizer interface.
var _ parallelizer.Parallelizer = &parallelizerWithFixedDelay{}

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)

Expand All @@ -105,6 +140,13 @@ func setupResources() {
},
}
Expect(hubClient.Create(ctx, ns2)).To(Succeed())

ns3 := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: memberReservedNSName3,
},
}
Expect(hubClient.Create(ctx, ns3)).To(Succeed())
}

var _ = BeforeSuite(func() {
Expand Down Expand Up @@ -143,6 +185,14 @@ var _ = BeforeSuite(func() {
filepath.Join("../../../", "test", "manifests"),
},
}
// memberEnv3 is the test environment for verifying the behavior of waved parallel processing in
// the work applier.
memberEnv3 = &envtest.Environment{
CRDDirectoryPaths: []string{
filepath.Join("../../../", "config", "crd", "bases"),
filepath.Join("../../../", "test", "manifests"),
},
}

var err error
hubCfg, err = hubEnv.Start()
Expand All @@ -157,6 +207,14 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
Expect(memberCfg2).ToNot(BeNil())

memberCfg3, err = memberEnv3.Start()
Expect(err).ToNot(HaveOccurred())
Expect(memberCfg3).ToNot(BeNil())

memberCfg2, err = memberEnv2.Start()
Expect(err).ToNot(HaveOccurred())
Expect(memberCfg2).ToNot(BeNil())

err = batchv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = fleetv1beta1.AddToScheme(scheme.Scheme)
Expand All @@ -177,13 +235,20 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
Expect(memberClient2).ToNot(BeNil())

memberClient3, err = client.New(memberCfg3, client.Options{Scheme: scheme.Scheme})
Expect(err).ToNot(HaveOccurred())
Expect(memberClient3).ToNot(BeNil())

// This setup also requires a client-go dynamic client for the member cluster.
memberDynamicClient1, err = dynamic.NewForConfig(memberCfg1)
Expect(err).ToNot(HaveOccurred())

memberDynamicClient2, err = dynamic.NewForConfig(memberCfg2)
Expect(err).ToNot(HaveOccurred())

memberDynamicClient3, err = dynamic.NewForConfig(memberCfg3)
Expect(err).ToNot(HaveOccurred())

By("Setting up the resources")
setupResources()

Expand All @@ -210,7 +275,7 @@ var _ = BeforeSuite(func() {
memberClient1.RESTMapper(),
hubMgr1.GetEventRecorderFor("work-applier"),
maxConcurrentReconciles,
workerCount,
parallelizer.NewParallelizer(workerCount),
30*time.Second,
true,
60,
Expand Down Expand Up @@ -259,7 +324,7 @@ var _ = BeforeSuite(func() {
memberClient2.RESTMapper(),
hubMgr2.GetEventRecorderFor("work-applier"),
maxConcurrentReconciles,
workerCount,
parallelizer.NewParallelizer(workerCount),
30*time.Second,
true,
60,
Expand All @@ -274,8 +339,52 @@ var _ = BeforeSuite(func() {
Complete(workApplier2)
Expect(err).NotTo(HaveOccurred())

By("Setting up the controller and the controller manager for member cluster 3")
hubMgr3, err = ctrl.NewManager(hubCfg, ctrl.Options{
Scheme: scheme.Scheme,
Metrics: server.Options{
BindAddress: "0",
},
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
memberReservedNSName3: {},
},
},
Logger: textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(4))),
})
Expect(err).ToNot(HaveOccurred())

pWithDelay := &parallelizerWithFixedDelay{
regularParallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
// To avoid flakiness, use a fixed delay of 5 seconds so that we could reliably verify
// if manifests are actually being processed in waves.
delay: parallelizerFixedDelay,
}
workApplier3 = NewReconciler(
hubClient,
memberReservedNSName3,
memberDynamicClient3,
memberClient3,
memberClient3.RESTMapper(),
hubMgr3.GetEventRecorderFor("work-applier"),
maxConcurrentReconciles,
pWithDelay,
30*time.Second,
true,
60,
nil, // Use the default backoff rate limiter.
)
// Due to name conflicts, the third work applier must be set up manually.
err = ctrl.NewControllerManagedBy(hubMgr3).Named("work-applier-controller-waved-parallel-processing").
WithOptions(ctrloption.Options{
MaxConcurrentReconciles: workApplier3.concurrentReconciles,
}).
For(&fleetv1beta1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(workApplier3)
Expect(err).NotTo(HaveOccurred())

wg = sync.WaitGroup{}
wg.Add(2)
wg.Add(3)
go func() {
defer GinkgoRecover()
defer wg.Done()
Expand All @@ -289,6 +398,13 @@ var _ = BeforeSuite(func() {
Expect(workApplier2.Join(ctx)).To(Succeed())
Expect(hubMgr2.Start(ctx)).To(Succeed())
}()

go func() {
defer GinkgoRecover()
defer wg.Done()
Expect(workApplier3.Join(ctx)).To(Succeed())
Expect(hubMgr3.Start(ctx)).To(Succeed())
}()
})

var _ = AfterSuite(func() {
Expand All @@ -300,4 +416,5 @@ var _ = AfterSuite(func() {
Expect(hubEnv.Stop()).To(Succeed())
Expect(memberEnv1.Stop()).To(Succeed())
Expect(memberEnv2.Stop()).To(Succeed())
Expect(memberEnv3.Stop()).To(Succeed())
})
Loading
Loading