Skip to content

Commit

Permalink
Fix scheduling of reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
sheerun committed May 12, 2020
1 parent 6e0abfa commit 533281f
Show file tree
Hide file tree
Showing 21 changed files with 169 additions and 363 deletions.
40 changes: 33 additions & 7 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -112,6 +113,10 @@ type Controller struct {
Interval time.Duration
// The DomainFilter defines which DNS records to keep or exclude
DomainFilter endpoint.DomainFilter
// The nextRunAt used for throttling and batching reconciliation
nextRunAt time.Time
// The nextRunAtMux is for atomic updating of nextRunAt
nextRunAtMux sync.Mutex
}

// RunOnce runs a single iteration of a reconciliation loop.
Expand Down Expand Up @@ -154,18 +159,39 @@ func (c *Controller) RunOnce(ctx context.Context) error {
return nil
}

// Run runs RunOnce in a loop with a delay until stopChan receives a value.
func (c *Controller) Run(ctx context.Context, stopChan <-chan struct{}) {
ticker := time.NewTicker(c.Interval)
// MIN_INTERVAL is used as window for batching events
const MIN_INTERVAL = 5 * time.Second

// RunOnceThrottled makes sure execution happens at most once per interval.
func (c *Controller) ScheduleRunOnce(now time.Time) {
c.nextRunAtMux.Lock()
defer c.nextRunAtMux.Unlock()
c.nextRunAt = now.Add(MIN_INTERVAL)
}

func (c *Controller) ShouldRunOnce(now time.Time) bool {
c.nextRunAtMux.Lock()
defer c.nextRunAtMux.Unlock()
if now.Before(c.nextRunAt) {
return false
}
c.nextRunAt = now.Add(c.Interval)
return true
}

// Run runs RunOnce in a loop with a delay until context is cancelled
func (c *Controller) Run(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
err := c.RunOnce(ctx)
if err != nil {
log.Error(err)
if c.ShouldRunOnce(time.Now()) {
if err := c.RunOnce(ctx); err != nil {
log.Error(err)
}
}
select {
case <-ticker.C:
case <-stopChan:
case <-ctx.Done():
log.Info("Terminating main controller loop")
return
}
Expand Down
68 changes: 33 additions & 35 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,43 +153,41 @@ func TestRunOnce(t *testing.T) {
source.AssertExpectations(t)
}

// TestSourceEventHandler tests that the Controller can use a Source's registered handler as a callback.
func TestSourceEventHandler(t *testing.T) {
source := new(testutils.MockSource)
func TestShouldRunOnce(t *testing.T) {
ctrl := &Controller{Interval: 10 * time.Minute}

handlerCh := make(chan bool)
timeoutCh := make(chan bool, 1)
stopChan := make(chan struct{}, 1)
now := time.Now()

ctrl := &Controller{
Source: source,
Registry: nil,
Policy: &plan.SyncPolicy{},
}
// First run of Run loop should execute RunOnce
assert.True(t, ctrl.ShouldRunOnce(now))

// Define and register a simple handler that sends a message to a channel to show it was called.
handler := func() error {
handlerCh <- true
return nil
}
// Example of preventing handler from being called more than once every 5 seconds.
ctrl.Source.AddEventHandler(handler, stopChan, 5*time.Second)

// Send timeout message after 10 seconds to fail test if handler is not called.
go func() {
time.Sleep(10 * time.Second)
timeoutCh <- true
}()

// Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds.
select {
case msg := <-handlerCh:
assert.True(t, msg)
case <-timeoutCh:
assert.Fail(t, "timed out waiting for event handler to be called")
}
// Second run should not
assert.False(t, ctrl.ShouldRunOnce(now))

now = now.Add(10 * time.Second)
// Changes happen in ingresses or services
ctrl.ScheduleRunOnce(now)
ctrl.ScheduleRunOnce(now)

// Because we batch changes, ShouldRunOnce returns False at first
assert.False(t, ctrl.ShouldRunOnce(now))
assert.False(t, ctrl.ShouldRunOnce(now.Add(100*time.Microsecond)))

// But after MIN_INTERVAL we should run reconciliation
now = now.Add(MIN_INTERVAL)
assert.True(t, ctrl.ShouldRunOnce(now))

// But just one time
assert.False(t, ctrl.ShouldRunOnce(now))

// We should wait maximum possible time after last reconciliation started
now = now.Add(10*time.Minute - time.Second)
assert.False(t, ctrl.ShouldRunOnce(now))

// After exactly Interval it's OK again to reconcile
now = now.Add(time.Second)
assert.True(t, ctrl.ShouldRunOnce(now))

close(stopChan)
close(handlerCh)
close(timeoutCh)
// But not two times
assert.False(t, ctrl.ShouldRunOnce(now))
}
20 changes: 9 additions & 11 deletions internal/testutils/mock_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package testutils

import (
"context"
"time"

"github.com/stretchr/testify/mock"
Expand All @@ -40,21 +41,18 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints.([]*endpoint.Endpoint), args.Error(1)
}

// AddEventHandler adds an event handler function that's called when sources that support such a thing have changed.
func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Execute callback handler no more than once per minInterval, until a message on stopChan is received.
// AddEventHandler adds an event handler that should be triggered if something in source changes
func (m *MockSource) AddEventHandler(ctx context.Context, handler func()) {
go func() {
var lastCallbackTime time.Time
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-stopChan:
case <-ctx.Done():
return
default:
now := time.Now()
if now.After(lastCallbackTime.Add(minInterval)) {
handler()
lastCallbackTime = time.Now()
}
case <-ticker.C:
handler()
}
}
}()
Expand Down
28 changes: 14 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,10 @@ func main() {
}
log.SetLevel(ll)

ctx := context.Background()

stopChan := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())

go serveMetrics(cfg.MetricsAddress)
go handleSigterm(stopChan)
go handleSigterm(cancel)

// Create a source.Config from the flags passed by the user.
sourceCfg := &source.Config{
Expand Down Expand Up @@ -323,13 +321,6 @@ func main() {
DomainFilter: domainFilter,
}

if cfg.UpdateEvents {
// Add RunOnce as the handler function that will be called when ingress/service sources have changed.
// Note that k8s Informers will perform an initial list operation, which results in the handler
// function initially being called for every Service/Ingress that exists limted by minInterval.
ctrl.Source.AddEventHandler(func() error { return ctrl.RunOnce(ctx) }, stopChan, 1*time.Minute)
}

if cfg.Once {
err := ctrl.RunOnce(ctx)
if err != nil {
Expand All @@ -338,15 +329,24 @@ func main() {

os.Exit(0)
}
ctrl.Run(ctx, stopChan)

if cfg.UpdateEvents {
// Add RunOnce as the handler function that will be called when ingress/service sources have changed.
// Note that k8s Informers will perform an initial list operation, which results in the handler
// function initially being called for every Service/Ingress that exists
ctrl.Source.AddEventHandler(ctx, func() { ctrl.ScheduleRunOnce(time.Now()) })
}

ctrl.ScheduleRunOnce(time.Now())
ctrl.Run(ctx)
}

func handleSigterm(stopChan chan struct{}) {
func handleSigterm(cancel func()) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM)
<-signals
log.Info("Received SIGTERM. Terminating...")
close(stopChan)
cancel()
}

func serveMetrics(address string) {
Expand Down

0 comments on commit 533281f

Please sign in to comment.