From d3846a93e228d30ac3bd4736b8809965d62eec6f Mon Sep 17 00:00:00 2001 From: btoews Date: Thu, 4 Apr 2024 14:26:26 -0600 Subject: [PATCH] debounce config change notifications --- internal/config/config.go | 26 +++++++++++++++++++++++++- internal/config/config_test.go | 18 ++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 6bd12b2db3..e2e9567c5e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -252,6 +252,13 @@ func (cfg *Config) Watch(ctx context.Context) (chan *Config, error) { cfg.subs = nil }() + var ( + notifyCtx context.Context + cancelNotify context.CancelFunc = func() {} + cancelLastNotify *context.CancelFunc = &cancelNotify + ) + defer func() { (*cancelLastNotify)() }() + for { select { case e, open := <-watch.Events: @@ -263,7 +270,17 @@ func (cfg *Config) Watch(ctx context.Context) (chan *Config, error) { continue } - go cfg.notifySubs(ctx) + // Debounce change notifications: notifySubs sleeps for 50ms + // before notifying subs. If we get another change before + // that, we preempt the previous notification attempt. This + // is necessary because we receive multiple notifications + // for a single config change on windows and the first event + // fires before the change is available to be read. + (*cancelLastNotify)() + notifyCtx, cancelNotify = context.WithCancel(ctx) + cancelLastNotify = &cancelNotify + + go cfg.notifySubs(notifyCtx) case err := <-watch.Errors: cfg.mu.Lock() defer cfg.mu.Unlock() @@ -302,6 +319,13 @@ func (cfg *Config) Unwatch(sub chan *Config) { } func (cfg *Config) notifySubs(ctx context.Context) { + // sleep for 50ms to facilitate debouncing (described above) + select { + case <-ctx.Done(): + return + case <-time.After(50 * time.Millisecond): + } + newCfg, err := Load(ctx, cfg.path) if err != nil { return diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 21a78b7e15..56a6d5725e 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -49,11 +49,25 @@ func TestConfigWatch(t *testing.T) { require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_bar`), 0644)) cfgs, errs = getConfigChanges(c1, c2) - require.Equal(t, 0, len(errs)) + require.Equal(t, 0, len(errs), errs) require.Equal(t, 2, len(cfgs)) require.Equal(t, cfgs[0], cfgs[1]) require.Equal(t, "fo1_bar", cfgs[0].Tokens.All()) + // debouncing + require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_aaa`), 0644)) + require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_bbb`), 0644)) + + cfgs, errs = getConfigChanges(c1, c2) + require.Equal(t, 0, len(errs)) + require.Equal(t, 2, len(cfgs)) + require.Equal(t, cfgs[0], cfgs[1]) + require.Equal(t, "fo1_bbb", cfgs[0].Tokens.All()) + + cfgs, errs = getConfigChanges(c1, c2) + require.Equal(t, 2, len(errs)) + require.Equal(t, 0, len(cfgs)) + cfg.Unwatch(c1) require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_baz`), 0644)) @@ -108,7 +122,7 @@ func getConfigChanges(chans ...chan *Config) ([]*Config, []error) { } else { errs = append(errs, errors.New("closed")) } - case <-time.After(50 * time.Millisecond): + case <-time.After(100 * time.Millisecond): m.Lock() errs = append(errs, errors.New("timeout")) }