From 032eced29a013995e8ef712df4322daeff06b3e1 Mon Sep 17 00:00:00 2001 From: Oklahomer Date: Sun, 10 Apr 2022 16:14:43 +0900 Subject: [PATCH] Separate filewatcher's run method into small functions --- watchers/filewatcher.go | 133 +++++++++++++------------- watchers/filewatcher_test.go | 178 ++++++++++++++++++++++++++++++++++- 2 files changed, 245 insertions(+), 66 deletions(-) diff --git a/watchers/filewatcher.go b/watchers/filewatcher.go index 8973c55..9150832 100644 --- a/watchers/filewatcher.go +++ b/watchers/filewatcher.go @@ -59,7 +59,7 @@ type fileWatcher struct { var _ sarah.ConfigWatcher = (*fileWatcher)(nil) -func (w *fileWatcher) Read(ctx context.Context, botType sarah.BotType, id string, configPtr interface{}) error { +func (w *fileWatcher) Read(_ context.Context, botType sarah.BotType, id string, configPtr interface{}) error { configDir := filepath.Join(w.baseDir, strings.ToLower(botType.String())) file := findPluginConfigFile(configDir, id) @@ -125,7 +125,6 @@ func (w *fileWatcher) Unwatch(botType sarah.BotType) (err error) { func (w *fileWatcher) run(ctx context.Context, events <-chan fsnotify.Event, errs <-chan error) { subscriptions := map[string][]*subscription{} -OP: for { select { case <-ctx.Done(): @@ -148,86 +147,94 @@ OP: case event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create: logger.Infof("Received %s event for %s.", event.Op.String(), event.Name) - configFile, err := plainPathToFile(event.Name) - if errors.Is(err, errUnableToDetermineConfigFileFormat) || errors.Is(err, errUnsupportedConfigFileFormat) { - // Irrelevant file is updated - continue OP - } else if err != nil { - logger.Warnf("Failed to locate %s: %+v", event.Name, err) - continue OP - } - - watches, ok := subscriptions[configFile.absDir] - if !ok { - // No corresponding subscription is found for the directory - continue OP - } - - // Notify all subscribers - for _, watch := range watches { - if watch.id == configFile.id { - watch.callback() - } - } + doHandleEvent(event, subscriptions) default: // Do nothing logger.Debugf("Received %s event for %s.", event.Op.String(), event.Name) - } case subscribe := <-w.subscribe: logger.Infof("Start subscribing to %s", subscribe.absDir) - - err := w.fsWatcher.Add(subscribe.absDir) - if err != nil { - subscribe.initErr <- err - continue OP - } - - watches, ok := subscriptions[subscribe.absDir] - if !ok { - watches = []*subscription{} - } - for _, w := range watches { - if w.id == subscribe.id { - subscribe.initErr <- sarah.ErrAlreadySubscribing - continue OP - } - } - subscriptions[subscribe.absDir] = append(watches, subscribe) - subscribe.initErr <- nil + err := doSubscribe(w.fsWatcher, subscribe, subscriptions) + subscribe.initErr <- err // Include nil error case botType := <-w.unsubscribe: logger.Infof("Stop subscribing config files for %s", botType) - - for dir, subscribeDirs := range subscriptions { - // Exclude all watches that are tied to given group, and stash those should be kept. - var remains []*subscription - for _, subscribeDir := range subscribeDirs { - if subscribeDir.botType != botType { - remains = append(remains, subscribeDir) - } - } - - // If none should remain, stop subscribing to watch corresponding directory. - if len(remains) == 0 { - _ = w.fsWatcher.Remove(dir) - delete(subscriptions, dir) - continue OP - } - - // If any remains, keep subscribing to the directory for remaining callbacks. - subscriptions[dir] = remains - } + doUnsubscribe(w.fsWatcher, botType, subscriptions) case err := <-errs: logger.Errorf("Error on subscribing to directory change: %+v", err) + } + } +} + +func doHandleEvent(event fsnotify.Event, subscriptions map[string][]*subscription) { + configFile, err := plainPathToFile(event.Name) + if errors.Is(err, errUnableToDetermineConfigFileFormat) || errors.Is(err, errUnsupportedConfigFileFormat) { + // Irrelevant file is updated + return + } else if err != nil { + logger.Warnf("Failed to locate %s: %+v", event.Name, err) + return + } + watches, ok := subscriptions[configFile.absDir] + if !ok { + // No corresponding subscription is found for the directory + return + } + + // Notify all subscribers + for _, watch := range watches { + if watch.id == configFile.id { + watch.callback() } } } +func doSubscribe(a abstractFsWatcher, s *subscription, subscriptions map[string][]*subscription) error { + watches, ok := subscriptions[s.absDir] + if !ok { + // Initial subscription for the given dir + err := a.Add(s.absDir) + if err != nil { + return err + } + + watches = []*subscription{} + } + for _, w := range watches { + if w.id == s.id { + return sarah.ErrAlreadySubscribing + } + } + subscriptions[s.absDir] = append(watches, s) + return nil +} + +func doUnsubscribe(a abstractFsWatcher, botType sarah.BotType, subscriptions map[string][]*subscription) { + for dir, subscribeDirs := range subscriptions { + // Exclude all watches that are tied to given group, and stash those should be kept. + var remains []*subscription + for _, subscribeDir := range subscribeDirs { + if subscribeDir.botType != botType { + remains = append(remains, subscribeDir) + } + } + + // If none should remain, stop subscribing to watch corresponding directory. + if len(remains) == 0 { + _ = a.Remove(dir) + delete(subscriptions, dir) + return + } + + // If any remains, keep subscribing to the directory for remaining callbacks. + subscriptions[dir] = remains + } +} + type fileType uint const ( diff --git a/watchers/filewatcher_test.go b/watchers/filewatcher_test.go index 6f34d58..cc2de53 100644 --- a/watchers/filewatcher_test.go +++ b/watchers/filewatcher_test.go @@ -202,6 +202,178 @@ func TestFileWatcher_Unwatch(t *testing.T) { } } +func Test_doSubscribe(t *testing.T) { + t.Run("Successful scenario", func(t *testing.T) { + added := 0 + d := &dummyFsWatcher{ + AddFunc: func(s string) error { + added++ + return nil + }, + } + + subscriptions := map[string][]*subscription{} + + t.Run("Initial subscription", func(t *testing.T) { + dir := filepath.Join("path", "to", "dummy", "dir", "A") + s := &subscription{ + absDir: dir, + id: "id1", + } + + err := doSubscribe(d, s, subscriptions) + + if err != nil { + t.Errorf("Unexpected error is returned: %s", err.Error()) + } + + if len(subscriptions) != 1 { + t.Fatal("Subscription is not started.") + } + + v, ok := subscriptions[dir] + if !ok { + t.Fatal("Subscription is not started.") + } + + if len(v) != 1 { + t.Errorf("Unexpected number of subscription is started: %d", len(v)) + } + + if v[0].absDir != dir { + t.Errorf("Unexpected dir is subscribed: %s", v[0].absDir) + } + + if added != 1 { + t.Errorf("Unexpected number of watcher calls: %d", added) + } + }) + + t.Run("Second subscription", func(t *testing.T) { + dir := filepath.Join("path", "to", "dummy", "dir", "B") + s := &subscription{ + absDir: dir, + id: "id2", + } + + err := doSubscribe(d, s, subscriptions) + + if err != nil { + t.Errorf("Unexpected error is returned: %s", err.Error()) + } + + if len(subscriptions) != 2 { + t.Fatal("Subscription is not started.") + } + + v, ok := subscriptions[dir] + if !ok { + t.Fatal("Subscription is not started.") + } + + if len(v) != 1 { + t.Errorf("Unexpected number of subscription is started: %d", len(v)) + } + + if v[0].absDir != dir { + t.Errorf("Unexpected dir is subscribed: %s", v[0].absDir) + } + + if added != 2 { + t.Errorf("Unexpected number of watcher calls: %d", added) + } + }) + + t.Run("Third subscription with the same dir as the second one", func(t *testing.T) { + dir := filepath.Join("path", "to", "dummy", "dir", "B") + s := &subscription{ + absDir: dir, + id: "id3", + } + + err := doSubscribe(d, s, subscriptions) + + if err != nil { + t.Errorf("Unexpected error is returned: %s", err.Error()) + } + + if len(subscriptions) != 2 { + t.Fatal("Subscription is not started.") + } + + v, ok := subscriptions[dir] + if !ok { + t.Fatal("Subscription is not started.") + } + + // Multiple ids for a single dir + if len(v) != 2 { + t.Errorf("Unexpected number of subscription is started: %d", len(v)) + } + + // watcher.Add is not called for the duplicated dir + if added != 2 { + t.Errorf("Unexpected number of watcher calls: %d", added) + } + }) + }) + +} + +func Test_doUnsubscribe(t *testing.T) { + var botTypeA sarah.BotType = "dummyBotTypeA" + var botTypeB sarah.BotType = "dummyBotTypeB" + dir := filepath.Join("path", "to", "dummy", "dir") + subscriptions := map[string][]*subscription{ + dir: { + &subscription{botType: botTypeA, id: "commandA"}, + &subscription{botType: botTypeB, id: "commandB"}, + }, + } + + var removeDir []string + d := &dummyFsWatcher{ + RemoveFunc: func(s string) error { + removeDir = append(removeDir, s) + return nil + }, + } + + t.Run("Initial unsubscription", func(t *testing.T) { + doUnsubscribe(d, botTypeA, subscriptions) + + v, ok := subscriptions[dir] + if !ok { + t.Fatal("Subscription to the basedir should stay at this point.") + } + + if len(v) != 1 { + t.Fatalf("Subscription count should be decreased to 1: %d", len(v)) + } + + if v[0].botType != botTypeB { + t.Error("The subscription to dummyBotTypeB should stay.") + } + + if len(removeDir) > 0 { + t.Errorf("Any directory should not be remved at this point: %d", len(removeDir)) + } + }) + + t.Run("Second unsubscription", func(t *testing.T) { + doUnsubscribe(d, botTypeB, subscriptions) + + _, ok := subscriptions[dir] + if ok { + t.Fatal("All subscription must be gone at this point.") + } + + if len(removeDir) != 1 { + t.Errorf("Basedir must be removed at this point.") + } + }) +} + func TestFileWatcher_run(t *testing.T) { dir, _ := filepath.Abs(filepath.Join("..", "testdata", "config", "dummy")) invalidDir, _ := filepath.Abs(filepath.Join("..", "testdata", "config", "invalid")) @@ -236,12 +408,11 @@ func TestFileWatcher_run(t *testing.T) { notify: nil, }, } - add := make(chan struct{}, len(subscriptions)) + testCnt := 0 remove := make(chan string, 2) fsWatcher := &dummyFsWatcher{ AddFunc: func(absDir string) error { - add <- struct{}{} - s := subscriptions[len(add)-1] + s := subscriptions[testCnt-1] return s.watcherErr }, RemoveFunc: func(absDir string) error { @@ -295,6 +466,7 @@ func TestFileWatcher_run(t *testing.T) { // Two subscriptions are added for a directory var botType sarah.BotType = "dummyBotType" for i, s := range subscriptions { + testCnt++ copied := s // Not to refer the last element in the loop err := make(chan error, 1) w.subscribe <- &subscription{