Skip to content

Commit

Permalink
Merge 032eced into 930a1cf
Browse files Browse the repository at this point in the history
  • Loading branch information
oklahomer committed Apr 10, 2022
2 parents 930a1cf + 032eced commit 6de5821
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 66 deletions.
133 changes: 70 additions & 63 deletions watchers/filewatcher.go
Expand Up @@ -59,7 +59,7 @@ type fileWatcher struct {

var _ sarah.ConfigWatcher = (*fileWatcher)(nil)

func (w *fileWatcher) Read(ctx context.Context, botType sarah.BotType, id string, configPtr interface{}) error {
func (w *fileWatcher) Read(_ context.Context, botType sarah.BotType, id string, configPtr interface{}) error {
configDir := filepath.Join(w.baseDir, strings.ToLower(botType.String()))
file := findPluginConfigFile(configDir, id)

Expand Down Expand Up @@ -125,7 +125,6 @@ func (w *fileWatcher) Unwatch(botType sarah.BotType) (err error) {
func (w *fileWatcher) run(ctx context.Context, events <-chan fsnotify.Event, errs <-chan error) {
subscriptions := map[string][]*subscription{}

OP:
for {
select {
case <-ctx.Done():
Expand All @@ -148,86 +147,94 @@ OP:
case event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create:
logger.Infof("Received %s event for %s.", event.Op.String(), event.Name)

configFile, err := plainPathToFile(event.Name)
if errors.Is(err, errUnableToDetermineConfigFileFormat) || errors.Is(err, errUnsupportedConfigFileFormat) {
// Irrelevant file is updated
continue OP
} else if err != nil {
logger.Warnf("Failed to locate %s: %+v", event.Name, err)
continue OP
}

watches, ok := subscriptions[configFile.absDir]
if !ok {
// No corresponding subscription is found for the directory
continue OP
}

// Notify all subscribers
for _, watch := range watches {
if watch.id == configFile.id {
watch.callback()
}
}
doHandleEvent(event, subscriptions)

default:
// Do nothing
logger.Debugf("Received %s event for %s.", event.Op.String(), event.Name)

}

case subscribe := <-w.subscribe:
logger.Infof("Start subscribing to %s", subscribe.absDir)

err := w.fsWatcher.Add(subscribe.absDir)
if err != nil {
subscribe.initErr <- err
continue OP
}

watches, ok := subscriptions[subscribe.absDir]
if !ok {
watches = []*subscription{}
}
for _, w := range watches {
if w.id == subscribe.id {
subscribe.initErr <- sarah.ErrAlreadySubscribing
continue OP
}
}
subscriptions[subscribe.absDir] = append(watches, subscribe)
subscribe.initErr <- nil
err := doSubscribe(w.fsWatcher, subscribe, subscriptions)
subscribe.initErr <- err // Include nil error

case botType := <-w.unsubscribe:
logger.Infof("Stop subscribing config files for %s", botType)

for dir, subscribeDirs := range subscriptions {
// Exclude all watches that are tied to given group, and stash those should be kept.
var remains []*subscription
for _, subscribeDir := range subscribeDirs {
if subscribeDir.botType != botType {
remains = append(remains, subscribeDir)
}
}

// If none should remain, stop subscribing to watch corresponding directory.
if len(remains) == 0 {
_ = w.fsWatcher.Remove(dir)
delete(subscriptions, dir)
continue OP
}

// If any remains, keep subscribing to the directory for remaining callbacks.
subscriptions[dir] = remains
}
doUnsubscribe(w.fsWatcher, botType, subscriptions)

case err := <-errs:
logger.Errorf("Error on subscribing to directory change: %+v", err)
}
}
}

func doHandleEvent(event fsnotify.Event, subscriptions map[string][]*subscription) {
configFile, err := plainPathToFile(event.Name)
if errors.Is(err, errUnableToDetermineConfigFileFormat) || errors.Is(err, errUnsupportedConfigFileFormat) {
// Irrelevant file is updated
return
} else if err != nil {
logger.Warnf("Failed to locate %s: %+v", event.Name, err)
return
}

watches, ok := subscriptions[configFile.absDir]
if !ok {
// No corresponding subscription is found for the directory
return
}

// Notify all subscribers
for _, watch := range watches {
if watch.id == configFile.id {
watch.callback()
}
}
}

func doSubscribe(a abstractFsWatcher, s *subscription, subscriptions map[string][]*subscription) error {
watches, ok := subscriptions[s.absDir]
if !ok {
// Initial subscription for the given dir
err := a.Add(s.absDir)
if err != nil {
return err
}

watches = []*subscription{}
}
for _, w := range watches {
if w.id == s.id {
return sarah.ErrAlreadySubscribing
}
}
subscriptions[s.absDir] = append(watches, s)
return nil
}

func doUnsubscribe(a abstractFsWatcher, botType sarah.BotType, subscriptions map[string][]*subscription) {
for dir, subscribeDirs := range subscriptions {
// Exclude all watches that are tied to given group, and stash those should be kept.
var remains []*subscription
for _, subscribeDir := range subscribeDirs {
if subscribeDir.botType != botType {
remains = append(remains, subscribeDir)
}
}

// If none should remain, stop subscribing to watch corresponding directory.
if len(remains) == 0 {
_ = a.Remove(dir)
delete(subscriptions, dir)
return
}

// If any remains, keep subscribing to the directory for remaining callbacks.
subscriptions[dir] = remains
}
}

type fileType uint

const (
Expand Down
178 changes: 175 additions & 3 deletions watchers/filewatcher_test.go
Expand Up @@ -202,6 +202,178 @@ func TestFileWatcher_Unwatch(t *testing.T) {
}
}

func Test_doSubscribe(t *testing.T) {
t.Run("Successful scenario", func(t *testing.T) {
added := 0
d := &dummyFsWatcher{
AddFunc: func(s string) error {
added++
return nil
},
}

subscriptions := map[string][]*subscription{}

t.Run("Initial subscription", func(t *testing.T) {
dir := filepath.Join("path", "to", "dummy", "dir", "A")
s := &subscription{
absDir: dir,
id: "id1",
}

err := doSubscribe(d, s, subscriptions)

if err != nil {
t.Errorf("Unexpected error is returned: %s", err.Error())
}

if len(subscriptions) != 1 {
t.Fatal("Subscription is not started.")
}

v, ok := subscriptions[dir]
if !ok {
t.Fatal("Subscription is not started.")
}

if len(v) != 1 {
t.Errorf("Unexpected number of subscription is started: %d", len(v))
}

if v[0].absDir != dir {
t.Errorf("Unexpected dir is subscribed: %s", v[0].absDir)
}

if added != 1 {
t.Errorf("Unexpected number of watcher calls: %d", added)
}
})

t.Run("Second subscription", func(t *testing.T) {
dir := filepath.Join("path", "to", "dummy", "dir", "B")
s := &subscription{
absDir: dir,
id: "id2",
}

err := doSubscribe(d, s, subscriptions)

if err != nil {
t.Errorf("Unexpected error is returned: %s", err.Error())
}

if len(subscriptions) != 2 {
t.Fatal("Subscription is not started.")
}

v, ok := subscriptions[dir]
if !ok {
t.Fatal("Subscription is not started.")
}

if len(v) != 1 {
t.Errorf("Unexpected number of subscription is started: %d", len(v))
}

if v[0].absDir != dir {
t.Errorf("Unexpected dir is subscribed: %s", v[0].absDir)
}

if added != 2 {
t.Errorf("Unexpected number of watcher calls: %d", added)
}
})

t.Run("Third subscription with the same dir as the second one", func(t *testing.T) {
dir := filepath.Join("path", "to", "dummy", "dir", "B")
s := &subscription{
absDir: dir,
id: "id3",
}

err := doSubscribe(d, s, subscriptions)

if err != nil {
t.Errorf("Unexpected error is returned: %s", err.Error())
}

if len(subscriptions) != 2 {
t.Fatal("Subscription is not started.")
}

v, ok := subscriptions[dir]
if !ok {
t.Fatal("Subscription is not started.")
}

// Multiple ids for a single dir
if len(v) != 2 {
t.Errorf("Unexpected number of subscription is started: %d", len(v))
}

// watcher.Add is not called for the duplicated dir
if added != 2 {
t.Errorf("Unexpected number of watcher calls: %d", added)
}
})
})

}

func Test_doUnsubscribe(t *testing.T) {
var botTypeA sarah.BotType = "dummyBotTypeA"
var botTypeB sarah.BotType = "dummyBotTypeB"
dir := filepath.Join("path", "to", "dummy", "dir")
subscriptions := map[string][]*subscription{
dir: {
&subscription{botType: botTypeA, id: "commandA"},
&subscription{botType: botTypeB, id: "commandB"},
},
}

var removeDir []string
d := &dummyFsWatcher{
RemoveFunc: func(s string) error {
removeDir = append(removeDir, s)
return nil
},
}

t.Run("Initial unsubscription", func(t *testing.T) {
doUnsubscribe(d, botTypeA, subscriptions)

v, ok := subscriptions[dir]
if !ok {
t.Fatal("Subscription to the basedir should stay at this point.")
}

if len(v) != 1 {
t.Fatalf("Subscription count should be decreased to 1: %d", len(v))
}

if v[0].botType != botTypeB {
t.Error("The subscription to dummyBotTypeB should stay.")
}

if len(removeDir) > 0 {
t.Errorf("Any directory should not be remved at this point: %d", len(removeDir))
}
})

t.Run("Second unsubscription", func(t *testing.T) {
doUnsubscribe(d, botTypeB, subscriptions)

_, ok := subscriptions[dir]
if ok {
t.Fatal("All subscription must be gone at this point.")
}

if len(removeDir) != 1 {
t.Errorf("Basedir must be removed at this point.")
}
})
}

func TestFileWatcher_run(t *testing.T) {
dir, _ := filepath.Abs(filepath.Join("..", "testdata", "config", "dummy"))
invalidDir, _ := filepath.Abs(filepath.Join("..", "testdata", "config", "invalid"))
Expand Down Expand Up @@ -236,12 +408,11 @@ func TestFileWatcher_run(t *testing.T) {
notify: nil,
},
}
add := make(chan struct{}, len(subscriptions))
testCnt := 0
remove := make(chan string, 2)
fsWatcher := &dummyFsWatcher{
AddFunc: func(absDir string) error {
add <- struct{}{}
s := subscriptions[len(add)-1]
s := subscriptions[testCnt-1]
return s.watcherErr
},
RemoveFunc: func(absDir string) error {
Expand Down Expand Up @@ -295,6 +466,7 @@ func TestFileWatcher_run(t *testing.T) {
// Two subscriptions are added for a directory
var botType sarah.BotType = "dummyBotType"
for i, s := range subscriptions {
testCnt++
copied := s // Not to refer the last element in the loop
err := make(chan error, 1)
w.subscribe <- &subscription{
Expand Down

0 comments on commit 6de5821

Please sign in to comment.