From 26bff8e29f421c722d06c76dff8cb8b5fe0fe3d6 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Mon, 11 May 2020 17:22:12 +0200 Subject: [PATCH] Make tests faster by fast polling --- controller/controller.go | 40 ++- controller/controller_test.go | 68 +++-- internal/testutils/mock_source.go | 20 +- main.go | 28 +-- .../async/bounded_frequency_runner.go | 238 ------------------ source/cloudfoundry.go | 4 +- source/connector.go | 3 +- source/crd.go | 4 +- source/dedup_source.go | 6 +- source/empty.go | 4 +- source/fake.go | 3 +- source/gateway.go | 3 +- source/ingress.go | 26 +- source/ingressroute.go | 3 +- source/multi_source.go | 6 +- source/node.go | 3 +- source/ocproute.go | 3 +- source/routegroup.go | 3 +- source/service.go | 26 +- source/source.go | 6 +- source/source_test.go | 35 +++ 21 files changed, 169 insertions(+), 363 deletions(-) delete mode 100644 pkg/k8sutils/async/bounded_frequency_runner.go diff --git a/controller/controller.go b/controller/controller.go index cc4feefb72..3be2f5b4c6 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -112,6 +113,10 @@ type Controller struct { Interval time.Duration // The DomainFilter defines which DNS records to keep or exclude DomainFilter endpoint.DomainFilter + // The nextRunAt used for throttling and batching reconciliation + nextRunAt time.Time + // The nextRunAtMux is for atomic updating of nextRunAt + nextRunAtMux sync.Mutex } // RunOnce runs a single iteration of a reconciliation loop. @@ -154,18 +159,39 @@ func (c *Controller) RunOnce(ctx context.Context) error { return nil } -// Run runs RunOnce in a loop with a delay until stopChan receives a value. -func (c *Controller) Run(ctx context.Context, stopChan <-chan struct{}) { - ticker := time.NewTicker(c.Interval) +// MIN_INTERVAL is used as window for batching events +const MIN_INTERVAL = 5 * time.Second + +// RunOnceThrottled makes sure execution happens at most once per interval. +func (c *Controller) ScheduleRunOnce(now time.Time) { + c.nextRunAtMux.Lock() + defer c.nextRunAtMux.Unlock() + c.nextRunAt = now.Add(MIN_INTERVAL) +} + +func (c *Controller) ShouldRunOnce(now time.Time) bool { + c.nextRunAtMux.Lock() + defer c.nextRunAtMux.Unlock() + if now.Before(c.nextRunAt) { + return false + } + c.nextRunAt = now.Add(c.Interval) + return true +} + +// Run runs RunOnce in a loop with a delay until context is cancelled +func (c *Controller) Run(ctx context.Context) { + ticker := time.NewTicker(time.Second) defer ticker.Stop() for { - err := c.RunOnce(ctx) - if err != nil { - log.Error(err) + if c.ShouldRunOnce(time.Now()) { + if err := c.RunOnce(ctx); err != nil { + log.Error(err) + } } select { case <-ticker.C: - case <-stopChan: + case <-ctx.Done(): log.Info("Terminating main controller loop") return } diff --git a/controller/controller_test.go b/controller/controller_test.go index 5a7cb74250..d019b259c5 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -153,43 +153,41 @@ func TestRunOnce(t *testing.T) { source.AssertExpectations(t) } -// TestSourceEventHandler tests that the Controller can use a Source's registered handler as a callback. -func TestSourceEventHandler(t *testing.T) { - source := new(testutils.MockSource) +func TestShouldRunOnce(t *testing.T) { + ctrl := &Controller{Interval: 10 * time.Minute} - handlerCh := make(chan bool) - timeoutCh := make(chan bool, 1) - stopChan := make(chan struct{}, 1) + now := time.Now() - ctrl := &Controller{ - Source: source, - Registry: nil, - Policy: &plan.SyncPolicy{}, - } + // First run of Run loop should execute RunOnce + assert.True(t, ctrl.ShouldRunOnce(now)) - // Define and register a simple handler that sends a message to a channel to show it was called. - handler := func() error { - handlerCh <- true - return nil - } - // Example of preventing handler from being called more than once every 5 seconds. - ctrl.Source.AddEventHandler(handler, stopChan, 5*time.Second) - - // Send timeout message after 10 seconds to fail test if handler is not called. - go func() { - time.Sleep(10 * time.Second) - timeoutCh <- true - }() - - // Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds. - select { - case msg := <-handlerCh: - assert.True(t, msg) - case <-timeoutCh: - assert.Fail(t, "timed out waiting for event handler to be called") - } + // Second run should not + assert.False(t, ctrl.ShouldRunOnce(now)) + + now = now.Add(10 * time.Second) + // Changes happen in ingresses or services + ctrl.ScheduleRunOnce(now) + ctrl.ScheduleRunOnce(now) + + // Because we batch changes, ShouldRunOnce returns False at first + assert.False(t, ctrl.ShouldRunOnce(now)) + assert.False(t, ctrl.ShouldRunOnce(now.Add(100*time.Microsecond))) + + // But after MIN_INTERVAL we should run reconciliation + now = now.Add(MIN_INTERVAL) + assert.True(t, ctrl.ShouldRunOnce(now)) + + // But just one time + assert.False(t, ctrl.ShouldRunOnce(now)) + + // We should wait maximum possible time after last reconciliation started + now = now.Add(10*time.Minute - time.Second) + assert.False(t, ctrl.ShouldRunOnce(now)) + + // After exactly Interval it's OK again to reconcile + now = now.Add(time.Second) + assert.True(t, ctrl.ShouldRunOnce(now)) - close(stopChan) - close(handlerCh) - close(timeoutCh) + // But not two times + assert.False(t, ctrl.ShouldRunOnce(now)) } diff --git a/internal/testutils/mock_source.go b/internal/testutils/mock_source.go index e1d62980a8..d6acb9349d 100644 --- a/internal/testutils/mock_source.go +++ b/internal/testutils/mock_source.go @@ -17,6 +17,7 @@ limitations under the License. package testutils import ( + "context" "time" "github.com/stretchr/testify/mock" @@ -40,21 +41,18 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints.([]*endpoint.Endpoint), args.Error(1) } -// AddEventHandler adds an event handler function that's called when sources that support such a thing have changed. -func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { - // Execute callback handler no more than once per minInterval, until a message on stopChan is received. +// AddEventHandler adds an event handler that should be triggered if something in source changes +func (m *MockSource) AddEventHandler(ctx context.Context, handler func()) { go func() { - var lastCallbackTime time.Time + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { select { - case <-stopChan: + case <-ctx.Done(): return - default: - now := time.Now() - if now.After(lastCallbackTime.Add(minInterval)) { - handler() - lastCallbackTime = time.Now() - } + case <-ticker.C: + handler() } } }() diff --git a/main.go b/main.go index 871ec1c97a..601a38a91d 100644 --- a/main.go +++ b/main.go @@ -89,12 +89,10 @@ func main() { } log.SetLevel(ll) - ctx := context.Background() - - stopChan := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) go serveMetrics(cfg.MetricsAddress) - go handleSigterm(stopChan) + go handleSigterm(cancel) // Create a source.Config from the flags passed by the user. sourceCfg := &source.Config{ @@ -323,13 +321,6 @@ func main() { DomainFilter: domainFilter, } - if cfg.UpdateEvents { - // Add RunOnce as the handler function that will be called when ingress/service sources have changed. - // Note that k8s Informers will perform an initial list operation, which results in the handler - // function initially being called for every Service/Ingress that exists limted by minInterval. - ctrl.Source.AddEventHandler(func() error { return ctrl.RunOnce(ctx) }, stopChan, 1*time.Minute) - } - if cfg.Once { err := ctrl.RunOnce(ctx) if err != nil { @@ -338,15 +329,24 @@ func main() { os.Exit(0) } - ctrl.Run(ctx, stopChan) + + if cfg.UpdateEvents { + // Add RunOnce as the handler function that will be called when ingress/service sources have changed. + // Note that k8s Informers will perform an initial list operation, which results in the handler + // function initially being called for every Service/Ingress that exists + ctrl.Source.AddEventHandler(ctx, func() { ctrl.ScheduleRunOnce(time.Now()) }) + } + + ctrl.ScheduleRunOnce(time.Now()) + ctrl.Run(ctx) } -func handleSigterm(stopChan chan struct{}) { +func handleSigterm(cancel func()) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGTERM) <-signals log.Info("Received SIGTERM. Terminating...") - close(stopChan) + cancel() } func serveMetrics(address string) { diff --git a/pkg/k8sutils/async/bounded_frequency_runner.go b/pkg/k8sutils/async/bounded_frequency_runner.go deleted file mode 100644 index 8619dac15d..0000000000 --- a/pkg/k8sutils/async/bounded_frequency_runner.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package async - -import ( - "fmt" - "sync" - "time" - - "k8s.io/client-go/util/flowcontrol" - "k8s.io/klog" -) - -// BoundedFrequencyRunner manages runs of a user-provided function. -// See NewBoundedFrequencyRunner for examples. -type BoundedFrequencyRunner struct { - name string // the name of this instance - minInterval time.Duration // the min time between runs, modulo bursts - maxInterval time.Duration // the max time between runs - - run chan struct{} // try an async run - - mu sync.Mutex // guards runs of fn and all mutations - fn func() // function to run - lastRun time.Time // time of last run - timer timer // timer for deferred runs - limiter rateLimiter // rate limiter for on-demand runs -} - -// designed so that flowcontrol.RateLimiter satisfies -type rateLimiter interface { - TryAccept() bool - Stop() -} - -type nullLimiter struct{} - -func (nullLimiter) TryAccept() bool { - return true -} - -func (nullLimiter) Stop() {} - -var _ rateLimiter = nullLimiter{} - -// for testing -type timer interface { - // C returns the timer's selectable channel. - C() <-chan time.Time - - // See time.Timer.Reset. - Reset(d time.Duration) bool - - // See time.Timer.Stop. - Stop() bool - - // See time.Now. - Now() time.Time - - // See time.Since. - Since(t time.Time) time.Duration - - // See time.Sleep. - Sleep(d time.Duration) -} - -// implement our timer in terms of std time.Timer. -type realTimer struct { - *time.Timer -} - -func (rt realTimer) C() <-chan time.Time { - return rt.Timer.C -} - -func (rt realTimer) Now() time.Time { - return time.Now() -} - -func (rt realTimer) Since(t time.Time) time.Duration { - return time.Since(t) -} - -func (rt realTimer) Sleep(d time.Duration) { - time.Sleep(d) -} - -var _ timer = realTimer{} - -// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, -// which will manage runs of the specified function. -// -// All runs will be async to the caller of BoundedFrequencyRunner.Run, but -// multiple runs are serialized. If the function needs to hold locks, it must -// take them internally. -// -// Runs of the function will have at least minInterval between them (from -// completion to next start), except that up to bursts may be allowed. Burst -// runs are "accumulated" over time, one per minInterval up to burstRuns total. -// This can be used, for example, to mitigate the impact of expensive operations -// being called in response to user-initiated operations. Run requests that -// would violate the minInterval are coallesced and run at the next opportunity. -// -// The function will be run at least once per maxInterval. For example, this can -// force periodic refreshes of state in the absence of anyone calling Run. -// -// Examples: -// -// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1) -// - fn will have at least 1 second between runs -// - fn will have no more than 5 seconds between runs -// -// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3) -// - fn will have at least 3 seconds between runs, with up to 3 burst runs -// - fn will have no more than 10 seconds between runs -// -// The maxInterval must be greater than or equal to the minInterval, If the -// caller passes a maxInterval less than minInterval, this function will panic. -func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { - timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately - <-timer.C() // consume the first tick - return construct(name, fn, minInterval, maxInterval, burstRuns, timer) -} - -// Make an instance with dependencies injected. -func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner { - if maxInterval < minInterval { - panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval)) - } - if timer == nil { - panic(fmt.Sprintf("%s: timer must be non-nil", name)) - } - - bfr := &BoundedFrequencyRunner{ - name: name, - fn: fn, - minInterval: minInterval, - maxInterval: maxInterval, - run: make(chan struct{}, 1), - timer: timer, - } - if minInterval == 0 { - bfr.limiter = nullLimiter{} - } else { - // allow burst updates in short succession - qps := float32(time.Second) / float32(minInterval) - bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer) - } - return bfr -} - -// Loop handles the periodic timer and run requests. This is expected to be -// called as a goroutine. -func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { - klog.V(3).Infof("%s Loop running", bfr.name) - bfr.timer.Reset(bfr.maxInterval) - for { - select { - case <-stop: - bfr.stop() - klog.V(3).Infof("%s Loop stopping", bfr.name) - return - case <-bfr.timer.C(): - bfr.tryRun() - case <-bfr.run: - bfr.tryRun() - } - } -} - -// Run the function as soon as possible. If this is called while Loop is not -// running, the call may be deferred indefinitely. -// If there is already a queued request to call the underlying function, it -// may be dropped - it is just guaranteed that we will try calling the -// underlying function as soon as possible starting from now. -func (bfr *BoundedFrequencyRunner) Run() { - // If it takes a lot of time to run the underlying function, noone is really - // processing elements from channel. So to avoid blocking here on the - // putting element to it, we simply skip it if there is already an element - // in it. - select { - case bfr.run <- struct{}{}: - default: - } -} - -// assumes the lock is not held -func (bfr *BoundedFrequencyRunner) stop() { - bfr.mu.Lock() - defer bfr.mu.Unlock() - bfr.limiter.Stop() - bfr.timer.Stop() -} - -// assumes the lock is not held -func (bfr *BoundedFrequencyRunner) tryRun() { - bfr.mu.Lock() - defer bfr.mu.Unlock() - - if bfr.limiter.TryAccept() { - // We're allowed to run the function right now. - bfr.fn() - bfr.lastRun = bfr.timer.Now() - bfr.timer.Stop() - bfr.timer.Reset(bfr.maxInterval) - klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) - return - } - - // It can't run right now, figure out when it can run next. - - elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run - nextPossible := bfr.minInterval - elapsed // time to next possible run - nextScheduled := bfr.maxInterval - elapsed // time to next periodic run - klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) - - if nextPossible < nextScheduled { - // Set the timer for ASAP, but don't drain here. Assuming Loop is running, - // it might get a delivery in the mean time, but that is OK. - bfr.timer.Stop() - bfr.timer.Reset(nextPossible) - klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) - } -} diff --git a/source/cloudfoundry.go b/source/cloudfoundry.go index 8157e841bd..ffa79df9a3 100644 --- a/source/cloudfoundry.go +++ b/source/cloudfoundry.go @@ -17,8 +17,8 @@ limitations under the License. package source import ( + "context" "net/url" - "time" cfclient "github.com/cloudfoundry-community/go-cfclient" @@ -36,7 +36,7 @@ func NewCloudFoundrySource(cfClient *cfclient.Client) (Source, error) { }, nil } -func (rs *cloudfoundrySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (rs *cloudfoundrySource) AddEventHandler(ctx context.Context, handler func()) { } // Endpoints returns endpoint objects diff --git a/source/connector.go b/source/connector.go index 13cbe2e94d..0bd9479ee4 100644 --- a/source/connector.go +++ b/source/connector.go @@ -17,6 +17,7 @@ limitations under the License. package source import ( + "context" "encoding/gob" "net" "time" @@ -65,5 +66,5 @@ func (cs *connectorSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints, nil } -func (cs *connectorSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (cs *connectorSource) AddEventHandler(ctx context.Context, handler func()) { } diff --git a/source/crd.go b/source/crd.go index d2ed5b0b26..80615ab73d 100644 --- a/source/crd.go +++ b/source/crd.go @@ -17,10 +17,10 @@ limitations under the License. package source import ( + "context" "fmt" "os" "strings" - "time" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -112,7 +112,7 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFi }, nil } -func (cs *crdSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) { } // Endpoints returns endpoint objects. diff --git a/source/dedup_source.go b/source/dedup_source.go index bf8c25887d..bfe5f8519d 100644 --- a/source/dedup_source.go +++ b/source/dedup_source.go @@ -17,7 +17,7 @@ limitations under the License. package source import ( - "time" + "context" log "github.com/sirupsen/logrus" @@ -59,6 +59,6 @@ func (ms *dedupSource) Endpoints() ([]*endpoint.Endpoint, error) { return result, nil } -func (ms *dedupSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { - ms.source.AddEventHandler(handler, stopChan, minInterval) +func (ms *dedupSource) AddEventHandler(ctx context.Context, handler func()) { + ms.source.AddEventHandler(ctx, handler) } diff --git a/source/empty.go b/source/empty.go index 415c8705bb..f731ae3649 100644 --- a/source/empty.go +++ b/source/empty.go @@ -17,7 +17,7 @@ limitations under the License. package source import ( - "time" + "context" "sigs.k8s.io/external-dns/endpoint" ) @@ -25,7 +25,7 @@ import ( // emptySource is a Source that returns no endpoints. type emptySource struct{} -func (e *emptySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (e *emptySource) AddEventHandler(ctx context.Context, handler func()) { } // Endpoints collects endpoints of all nested Sources and returns them in a single slice. diff --git a/source/fake.go b/source/fake.go index 85fb779c10..54a59b604a 100644 --- a/source/fake.go +++ b/source/fake.go @@ -21,6 +21,7 @@ Note: currently only supports IP targets (A records), not hostname targets package source import ( + "context" "fmt" "math/rand" "net" @@ -54,7 +55,7 @@ func NewFakeSource(fqdnTemplate string) (Source, error) { }, nil } -func (sc *fakeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (sc *fakeSource) AddEventHandler(ctx context.Context, handler func()) { } // Endpoints returns endpoint objects. diff --git a/source/gateway.go b/source/gateway.go index c89369ad9c..aaf7563a59 100644 --- a/source/gateway.go +++ b/source/gateway.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "fmt" "sort" "strings" @@ -173,7 +174,7 @@ func (sc *gatewaySource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints, nil } -func (sc *gatewaySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (sc *gatewaySource) AddEventHandler(ctx context.Context, handler func()) { } func (sc *gatewaySource) endpointsFromTemplate(config *istiomodel.Config) ([]*endpoint.Endpoint, error) { diff --git a/source/ingress.go b/source/ingress.go index 0ffe5a2fb1..01972ec41e 100644 --- a/source/ingress.go +++ b/source/ingress.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "fmt" "sort" "strings" @@ -34,7 +35,6 @@ import ( "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" - "sigs.k8s.io/external-dns/pkg/k8sutils/async" ) const ( @@ -56,7 +56,6 @@ type ingressSource struct { combineFQDNAnnotation bool ignoreHostnameAnnotation bool ingressInformer extinformers.IngressInformer - runner *async.BoundedFrequencyRunner } // NewIngressSource creates a new ingressSource with the given config. @@ -298,30 +297,21 @@ func targetsFromIngressStatus(status v1beta1.IngressStatus) endpoint.Targets { return targets } -func (sc *ingressSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { - // Add custom resource event handler - log.Debug("Adding (bounded) event handler for ingress") +func (sc *ingressSource) AddEventHandler(ctx context.Context, handler func()) { + log.Debug("Adding event handler for ingress") - maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours - burst := 2 // allow up to two handler burst calls - log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d", - minInterval, maxInterval, burst) - sc.runner = async.NewBoundedFrequencyRunner("ingress-handler", func() { - _ = handler() - }, minInterval, maxInterval, burst) - go sc.runner.Loop(stopChan) - - // run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 sc.ingressInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - sc.runner.Run() + handler() }, UpdateFunc: func(old interface{}, new interface{}) { - sc.runner.Run() + handler() }, DeleteFunc: func(obj interface{}) { - sc.runner.Run() + handler() }, }, ) diff --git a/source/ingressroute.go b/source/ingressroute.go index 9e5ee3cb49..846174f87b 100644 --- a/source/ingressroute.go +++ b/source/ingressroute.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "fmt" "sort" "strings" @@ -333,5 +334,5 @@ func parseContourLoadBalancerService(service string) (namespace, name string, er return } -func (sc *ingressRouteSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (sc *ingressRouteSource) AddEventHandler(ctx context.Context, handler func()) { } diff --git a/source/multi_source.go b/source/multi_source.go index 1f255ceb3b..494acd16b7 100644 --- a/source/multi_source.go +++ b/source/multi_source.go @@ -17,7 +17,7 @@ limitations under the License. package source import ( - "time" + "context" "sigs.k8s.io/external-dns/endpoint" ) @@ -43,9 +43,9 @@ func (ms *multiSource) Endpoints() ([]*endpoint.Endpoint, error) { return result, nil } -func (ms *multiSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (ms *multiSource) AddEventHandler(ctx context.Context, handler func()) { for _, s := range ms.children { - s.AddEventHandler(handler, stopChan, minInterval) + s.AddEventHandler(ctx, handler) } } diff --git a/source/node.go b/source/node.go index a573821065..d54ff890b9 100644 --- a/source/node.go +++ b/source/node.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "fmt" "strings" "text/template" @@ -167,7 +168,7 @@ func (ns *nodeSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpointsSlice, nil } -func (ns *nodeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (ns *nodeSource) AddEventHandler(ctx context.Context, handler func()) { } // nodeAddress returns node's externalIP and if that's not found, node's internalIP diff --git a/source/ocproute.go b/source/ocproute.go index 8a88da4d65..0f2ed30cfa 100644 --- a/source/ocproute.go +++ b/source/ocproute.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "fmt" "sort" "strings" @@ -109,7 +110,7 @@ func NewOcpRouteSource( } // TODO add a meaningful EventHandler -func (ors *ocpRouteSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +func (ors *ocpRouteSource) AddEventHandler(ctx context.Context, handler func()) { } // Endpoints returns endpoint objects for each host-target combination that should be processed. diff --git a/source/routegroup.go b/source/routegroup.go index e74b0875c9..a9dbe97cff 100644 --- a/source/routegroup.go +++ b/source/routegroup.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -239,7 +240,7 @@ func NewRouteGroupSource(timeout time.Duration, token, tokenPath, master, namesp } // AddEventHandler for routegroup is currently a no op, because we do not implement caching, yet. -func (sc *routeGroupSource) AddEventHandler(func() error, <-chan struct{}, time.Duration) {} +func (sc *routeGroupSource) AddEventHandler(ctx context.Context, handler func()) {} // Endpoints returns endpoint objects for each host-target combination that should be processed. // Retrieves all routeGroup resources on all namespaces. diff --git a/source/service.go b/source/service.go index 84b758cbd9..5c5c10905f 100644 --- a/source/service.go +++ b/source/service.go @@ -18,6 +18,7 @@ package source import ( "bytes" + "context" "fmt" "sort" "strings" @@ -35,7 +36,6 @@ import ( "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" - "sigs.k8s.io/external-dns/pkg/k8sutils/async" ) const ( @@ -65,7 +65,6 @@ type serviceSource struct { podInformer coreinformers.PodInformer nodeInformer coreinformers.NodeInformer serviceTypeFilter map[string]struct{} - runner *async.BoundedFrequencyRunner } // NewServiceSource creates a new serviceSource with the given config. @@ -593,30 +592,21 @@ func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, nodeTargets e return endpoints } -func (sc *serviceSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { - // Add custom resource event handler - log.Debug("Adding (bounded) event handler for service") +func (sc *serviceSource) AddEventHandler(ctx context.Context, handler func()) { + log.Debug("Adding event handler for service") - maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours - burst := 2 // allow up to two handler burst calls - log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d", - minInterval, maxInterval, burst) - sc.runner = async.NewBoundedFrequencyRunner("service-handler", func() { - _ = handler() - }, minInterval, maxInterval, burst) - go sc.runner.Loop(stopChan) - - // run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 sc.serviceInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - sc.runner.Run() + handler() }, UpdateFunc: func(old interface{}, new interface{}) { - sc.runner.Run() + handler() }, DeleteFunc: func(obj interface{}) { - sc.runner.Run() + handler() }, }, ) diff --git a/source/source.go b/source/source.go index 0b9004fd8e..5611d9f610 100644 --- a/source/source.go +++ b/source/source.go @@ -17,6 +17,7 @@ limitations under the License. package source import ( + "context" "fmt" "math" "net" @@ -62,9 +63,8 @@ const ( // Source defines the interface Endpoint sources should implement. type Source interface { Endpoints() ([]*endpoint.Endpoint, error) - // AddEventHandler adds an event handler function that's called when (supported) sources have changed. - // The handler should not be called more than than once per time.Duration and not again after stop channel is closed. - AddEventHandler(func() error, <-chan struct{}, time.Duration) + // AddEventHandler adds an event handler that should be triggered if something in source changes + AddEventHandler(context.Context, func()) } func getTTLFromAnnotations(annotations map[string]string) (endpoint.TTL, error) { diff --git a/source/source_test.go b/source/source_test.go index 04c3b46129..dbf6c90c79 100644 --- a/source/source_test.go +++ b/source/source_test.go @@ -17,12 +17,15 @@ limitations under the License. package source import ( + "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/internal/testutils" ) func TestGetTTLFromAnnotations(t *testing.T) { @@ -105,3 +108,35 @@ func TestSuitableType(t *testing.T) { } } } + +// TestSourceEventHandler that AddEventHandler calls provided handler +func TestSourceEventHandler(t *testing.T) { + source := new(testutils.MockSource) + + handlerCh := make(chan bool) + + ctx, cancel := context.WithCancel(context.Background()) + + // Define and register a simple handler that sends a message to a channel to show it was called. + handler := func() { + handlerCh <- true + } + // Example of preventing handler from being called more than once every 5 seconds. + source.AddEventHandler(ctx, handler) + + // Send timeout message after 10 seconds to fail test if handler is not called. + go func() { + time.Sleep(10 * time.Second) + cancel() + }() + + // Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds. + select { + case msg := <-handlerCh: + assert.True(t, msg) + case <-ctx.Done(): + assert.Fail(t, "timed out waiting for event handler to be called") + } + + close(handlerCh) +}