Skip to content

Commit

Permalink
debounce config change notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
btoews committed Apr 4, 2024
1 parent d396d0f commit d3846a9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
26 changes: 25 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ func (cfg *Config) Watch(ctx context.Context) (chan *Config, error) {
cfg.subs = nil
}()

var (
notifyCtx context.Context
cancelNotify context.CancelFunc = func() {}
cancelLastNotify *context.CancelFunc = &cancelNotify
)
defer func() { (*cancelLastNotify)() }()

for {
select {
case e, open := <-watch.Events:
Expand All @@ -263,7 +270,17 @@ func (cfg *Config) Watch(ctx context.Context) (chan *Config, error) {
continue
}

go cfg.notifySubs(ctx)
// Debounce change notifications: notifySubs sleeps for 50ms
// before notifying subs. If we get another change before
// that, we preempt the previous notification attempt. This
// is necessary because we receive multiple notifications
// for a single config change on windows and the first event
// fires before the change is available to be read.
(*cancelLastNotify)()
notifyCtx, cancelNotify = context.WithCancel(ctx)
cancelLastNotify = &cancelNotify

go cfg.notifySubs(notifyCtx)
case err := <-watch.Errors:
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand Down Expand Up @@ -302,6 +319,13 @@ func (cfg *Config) Unwatch(sub chan *Config) {
}

func (cfg *Config) notifySubs(ctx context.Context) {
// sleep for 50ms to facilitate debouncing (described above)
select {
case <-ctx.Done():
return
case <-time.After(50 * time.Millisecond):
}

newCfg, err := Load(ctx, cfg.path)
if err != nil {
return
Expand Down
18 changes: 16 additions & 2 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,25 @@ func TestConfigWatch(t *testing.T) {
require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_bar`), 0644))

cfgs, errs = getConfigChanges(c1, c2)
require.Equal(t, 0, len(errs))
require.Equal(t, 0, len(errs), errs)
require.Equal(t, 2, len(cfgs))
require.Equal(t, cfgs[0], cfgs[1])
require.Equal(t, "fo1_bar", cfgs[0].Tokens.All())

// debouncing
require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_aaa`), 0644))
require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_bbb`), 0644))

cfgs, errs = getConfigChanges(c1, c2)
require.Equal(t, 0, len(errs))
require.Equal(t, 2, len(cfgs))
require.Equal(t, cfgs[0], cfgs[1])
require.Equal(t, "fo1_bbb", cfgs[0].Tokens.All())

cfgs, errs = getConfigChanges(c1, c2)
require.Equal(t, 2, len(errs))
require.Equal(t, 0, len(cfgs))

cfg.Unwatch(c1)

require.NoError(t, os.WriteFile(path, []byte(`access_token: fo1_baz`), 0644))
Expand Down Expand Up @@ -108,7 +122,7 @@ func getConfigChanges(chans ...chan *Config) ([]*Config, []error) {
} else {
errs = append(errs, errors.New("closed"))
}
case <-time.After(50 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
m.Lock()
errs = append(errs, errors.New("timeout"))
}
Expand Down

0 comments on commit d3846a9

Please sign in to comment.