-
Notifications
You must be signed in to change notification settings - Fork 2
Description
Problem
After dcontroller restarts, multi-source join pipelines using watcher sources may silently produce no output.
Watcher sources use IncrementalReconciler which processes events one at a time via pipeline.Evaluate(). On restart, informers re-list existing resources as Added events. But informer caches sync in parallel — events from source A arrive and try to join with source B's cache, which may still be empty. The join returns empty. When source B events arrive, source A events have already been consumed and won't re-trigger.
StateOfTheWorldReconciler (used for periodic sources) already handles this correctly via Pipeline.Sync(), which re-evaluates all current state. But IncrementalReconciler never calls it.
Observed Behavior
3-way join (CustomResourceA + CustomResourceB + Secret) stops producing output after OOM restart. All three sources exist. Pipeline returns "result": "[]". Resolved only by patching an input resource to trigger a new event.
Proposed Fix
After all informer caches for a controller's sources are synced, call Pipeline.Sync() once in the IncrementalReconciler path:
// After cache.WaitForCacheSync() for all sources:
deltas, err := pipeline.Sync()
// Apply deltas to targetThe Sync() method and target write logic already exist in StateOfTheWorldReconciler — the incremental path just needs to invoke it once at startup.
References
pkg/controller/reconcilers.go:36— IncrementalReconciler.Reconcile (no Sync)pkg/controller/reconcilers.go:102— StateOfTheWorldReconciler.Reconcile (calls Sync)pkg/controller/declarative.go:97-102— reconciler selection based on source typepkg/pipeline/pipeline.go:247— Pipeline.Sync() implementation