diff --git a/Taskfile.yml b/Taskfile.yml index 3f73889..10d2127 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -30,9 +30,19 @@ tasks: upx ./bin/{{.binary}} fi + test:executor: + cmds: + - go test ./pkg/executor/... + + test:watcher: + env: + DEBUG: false + cmds: + - go test -json ./pkg/watcher/... | gotestfmt + build:dev: cmds: - - go build -o ./bin/fwatcher-dev ./cmd + - go build -o ./bin/fwatcher ./cmd example:http-server: cmds: diff --git a/cmd/main.go b/cmd/main.go index 09383dc..02909e1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,19 +2,14 @@ package main import ( "context" - "fmt" - "log/slog" "os" "os/exec" "os/signal" - "path/filepath" "strings" - "sync" "syscall" "time" "github.com/nxtcoder17/fwatcher/pkg/executor" - fn "github.com/nxtcoder17/fwatcher/pkg/functions" "github.com/nxtcoder17/fwatcher/pkg/logging" "github.com/nxtcoder17/fwatcher/pkg/watcher" "github.com/urfave/cli/v3" @@ -25,21 +20,11 @@ var ( Version string ) -// DefaultIgnoreList is list of directories that are mostly ignored -var DefaultIgnoreList = []string{ - ".git", ".svn", ".hg", // version control - ".idea", ".vscode", // IDEs - ".direnv", // direnv nix - "node_modules", // node - ".DS_Store", // macOS - ".log", // logs -} - func main() { cmd := &cli.Command{ Name: ProgramName, UseShortOptionHandling: true, - Usage: "simple tool to run commands on filesystem change events", + Usage: "a simple tool to run things on filesystem change events", ArgsUsage: "", Version: Version, Flags: []cli.Flag{ @@ -77,7 +62,7 @@ func main() { &cli.StringSliceFlag{ Name: "ignore-list", Usage: "disables ignoring from default ignore list", - Value: DefaultIgnoreList, + Value: watcher.DefaultIgnoreList, Aliases: []string{"I"}, }, @@ -92,17 +77,10 @@ func main() { Usage: "interactive mode, with stdin", }, - &cli.BoolFlag{ - Name: "sse", - Usage: "run watcher in sse mode", - }, - &cli.StringFlag{ Name: "sse-addr", HideDefault: false, - Usage: "run watcher in sse mode", - Sources: cli.ValueSourceChain{}, - Value: ":12345", + Usage: "run watcher with Server Side Events (SSE) enabled", }, }, Action: func(ctx context.Context, c *cli.Command) error { @@ -161,86 +139,39 @@ func main() { panic(err) } - var ex executor.Executor + var executors []executor.Executor - switch { - case c.Bool("sse"): - { - sseAddr := c.String("sse-addr") - ex = executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr}) - logger.Info("HELLo world") - } - default: - { - execCmd := c.Args().First() - execArgs := c.Args().Tail() - ex = executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ - Logger: logger, - Interactive: c.Bool("interactive"), - Command: func(context.Context) *exec.Cmd { + if sseAddr := c.String("sse-addr"); sseAddr != "" { + executors = append(executors, executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr})) + } + + if c.NArg() > 0 { + execCmd := c.Args().First() + execArgs := c.Args().Tail() + executors = append(executors, executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Interactive: c.Bool("interactive"), + Commands: []func(context.Context) *exec.Cmd{ + func(c context.Context) *exec.Cmd { cmd := exec.CommandContext(ctx, execCmd, execArgs...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Stdin = os.Stdin return cmd }, - // IsInteractive: true, - }) - } + }, + })) } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := ex.Start(); err != nil { - slog.Error("got", "err", err) - } - logger.Debug("1. start-job finished") - }() - - counter := 0 - pwd := fn.Must(os.Getwd()) - - wg.Add(1) - go func() { - defer wg.Done() - w.Watch(ctx) - logger.Debug("2. watch context closed") - }() - - wg.Add(1) - go func() { - defer wg.Done() - <-ctx.Done() - ex.Stop() - logger.Debug("3. killed signal processed") - }() - - for event := range w.GetEvents() { - logger.Debug("received", "event", event) - relPath, err := filepath.Rel(pwd, event.Name) - if err != nil { - return err - } - counter += 1 - logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, relPath)) - - ex.OnWatchEvent(executor.Event{Source: event.Name}) + if err := w.WatchAndExecute(ctx, executors); err != nil { + return err } - // logger.Debug("stopping executor") - // if err := ex.Stop(); err != nil { - // return err - // } - // logger.Info("stopped executor") - - wg.Wait() return nil }, } - ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGABRT) + ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM) defer stop() if err := cmd.Run(ctx, os.Args); err != nil { diff --git a/flake.nix b/flake.nix index 6080261..ba7028f 100644 --- a/flake.nix +++ b/flake.nix @@ -20,6 +20,8 @@ pre-commit go_1_22 + gotestfmt + upx go-task ]; diff --git a/pkg/executor/cmd-executor.go b/pkg/executor/cmd-executor.go index e121c9c..5608c12 100644 --- a/pkg/executor/cmd-executor.go +++ b/pkg/executor/cmd-executor.go @@ -12,17 +12,18 @@ import ( type CmdExecutor struct { logger *slog.Logger parentCtx context.Context - newCmd func(context.Context) *exec.Cmd + commands []func(context.Context) *exec.Cmd interactive bool - mu sync.Mutex - abort func() + mu sync.Mutex + + kill func() error } type CmdExecutorArgs struct { Logger *slog.Logger - Command func(context.Context) *exec.Cmd + Commands []func(context.Context) *exec.Cmd Interactive bool } @@ -33,8 +34,8 @@ func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor { return &CmdExecutor{ parentCtx: ctx, - logger: args.Logger.With("component", "cmd-executor"), - newCmd: args.Command, + logger: args.Logger, + commands: args.Commands, mu: sync.Mutex{}, interactive: args.Interactive, } @@ -47,61 +48,91 @@ func (ex *CmdExecutor) OnWatchEvent(ev Event) error { return nil } -// Start implements Executor. -func (ex *CmdExecutor) Start() error { - ex.mu.Lock() - ctx, cf := context.WithCancel(ex.parentCtx) - ex.abort = cf - ex.mu.Unlock() - - cmd := ex.newCmd(ctx) - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if ex.interactive { - cmd.Stdin = os.Stdin - cmd.SysProcAttr.Foreground = true +func killPID(pid int, logger ...*slog.Logger) error { + var l *slog.Logger + if len(logger) > 0 { + l = logger[0] + } else { + l = slog.Default() } - if err := cmd.Start(); err != nil { + l.Debug("about to kill", "process", pid) + if err := syscall.Kill(-pid, syscall.SIGKILL); err != nil { + if err == syscall.ESRCH { + return nil + } + l.Error("failed to kill, got", "err", err) return err } + return nil +} - done := make(chan error) - go func() { - done <- cmd.Wait() - }() +// Start implements Executor. +func (ex *CmdExecutor) Start() error { + ex.mu.Lock() + defer ex.mu.Unlock() + for i := range ex.commands { + if err := ex.parentCtx.Err(); err != nil { + return err + } - select { - case <-ctx.Done(): - ex.logger.Debug("process context done") - case err := <-done: - ex.logger.Debug("process wait completed, got", "err", err) - } + ctx, cf := context.WithCancel(ex.parentCtx) + defer cf() - ex.logger.Debug("process", "pid", cmd.Process.Pid) + cmd := ex.commands[i](ctx) + + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if ex.interactive { + cmd.Stdin = os.Stdin + cmd.SysProcAttr.Foreground = true + } - if ex.interactive { - // Send SIGTERM to the interactive process, as user will see it on his screen - proc, err := os.FindProcess(os.Getpid()) - if err != nil { + if err := cmd.Start(); err != nil { return err } - err = proc.Signal(syscall.SIGTERM) - if err != nil { - if err != syscall.ESRCH { - ex.logger.Error("failed to kill, got", "err", err) + logger := ex.logger.With("pid", cmd.Process.Pid, "command", i+1) + + ex.kill = func() error { + return killPID(cmd.Process.Pid, logger) + } + + go func() { + if err := cmd.Wait(); err != nil { + logger.Debug("process finished (wait completed), got", "err", err) + } + cf() + }() + + select { + case <-ctx.Done(): + logger.Debug("process finished (context cancelled)") + case <-ex.parentCtx.Done(): + logger.Debug("process finished (parent context cancelled)") + } + + if ex.interactive { + // Send SIGTERM to the interactive process, as user will see it on his screen + proc, err := os.FindProcess(os.Getpid()) + if err != nil { + return err + } + + err = proc.Signal(syscall.SIGTERM) + if err != nil { + if err != syscall.ESRCH { + logger.Error("failed to kill, got", "err", err) + return err + } return err } - return err } - } - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - if err == syscall.ESRCH { - return nil + if err := ex.kill(); err != nil { + return err } - ex.logger.Error("failed to kill, got", "err", err) - return err + + logger.Debug("command fully executed and processed") } return nil @@ -109,11 +140,9 @@ func (ex *CmdExecutor) Start() error { // Stop implements Executor. func (ex *CmdExecutor) Stop() error { - ex.mu.Lock() - if ex.abort != nil { - ex.abort() + if ex.kill != nil { + return ex.kill() } - ex.mu.Unlock() return nil } diff --git a/pkg/executor/cmd-executor_test.go b/pkg/executor/cmd-executor_test.go new file mode 100644 index 0000000..531d071 --- /dev/null +++ b/pkg/executor/cmd-executor_test.go @@ -0,0 +1,76 @@ +package executor + +import ( + "bytes" + "context" + "io" + "log/slog" + "os/exec" + "strings" + "testing" +) + +func Test_Exectuor_Start(t *testing.T) { + newCmd := func(stdout io.Writer, cmd string, args ...string) func(c context.Context) *exec.Cmd { + return func(c context.Context) *exec.Cmd { + cmd := exec.CommandContext(c, cmd, args...) + cmd.Stdout = stdout + return cmd + } + } + + tests := []struct { + name string + commands func(stdout io.Writer) []func(c context.Context) *exec.Cmd + output []string + }{ + // TODO: add your tests + { + name: "1. with single command", + commands: func(stdout io.Writer) []func(c context.Context) *exec.Cmd { + return []func(c context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + } + }, + output: []string{ + "hi", + }, + }, + { + name: "2. testing", + commands: func(stdout io.Writer) []func(c context.Context) *exec.Cmd { + return []func(c context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + } + }, + output: []string{ + "hi", + "hello", + }, + }, + } + + logger := slog.Default() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := new(bytes.Buffer) + ex := NewCmdExecutor(context.TODO(), CmdExecutorArgs{ + Logger: logger, + Commands: tt.commands(b), + Interactive: false, + }) + + if err := ex.Start(); err != nil { + t.Error(err) + } + + want := strings.Join(tt.output, "\n") + got := strings.TrimSpace(b.String()) + + if got != want { + t.Errorf("FAILED (%s)\n\t got: %s\n\twant: %s\n", tt.name, got, want) + } + }) + } +} diff --git a/pkg/executor/sse-executor.go b/pkg/executor/sse-executor.go index e9d8507..8ff4481 100644 --- a/pkg/executor/sse-executor.go +++ b/pkg/executor/sse-executor.go @@ -2,6 +2,7 @@ package executor import ( "encoding/json" + "errors" "fmt" "log/slog" "net/http" @@ -16,6 +17,8 @@ any client can connect to this event at /event type SSEExectuor struct { ch chan Event server *http.Server + + logger *slog.Logger } // OnWatchEvent implements Executor. @@ -31,7 +34,14 @@ func (s *SSEExectuor) OnWatchEvent(event Event) error { // Start implements Executor. func (s *SSEExectuor) Start() error { - return s.server.ListenAndServe() + s.logger.Info("Server Side Event notifier server started", "addr", s.server.Addr) + if err := s.server.ListenAndServe(); err != nil { + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return err + } + return nil } // Stop implements Executor. @@ -43,6 +53,8 @@ var _ Executor = (*SSEExectuor)(nil) type SSEExecutorArgs struct { Addr string + + Logger *slog.Logger } func NewSSEExecutor(args SSEExecutorArgs) *SSEExectuor { @@ -68,10 +80,15 @@ func NewSSEExecutor(args SSEExecutorArgs) *SSEExectuor { } }) + logger := args.Logger + if logger == nil { + logger = slog.Default() + } + server := http.Server{ Addr: args.Addr, Handler: mux, } - return &SSEExectuor{ch: ch, server: &server} + return &SSEExectuor{ch: ch, server: &server, logger: logger} } diff --git a/pkg/watcher/lib.go b/pkg/watcher/lib.go new file mode 100644 index 0000000..e89a8d1 --- /dev/null +++ b/pkg/watcher/lib.go @@ -0,0 +1,88 @@ +package watcher + +import ( + "context" + "fmt" + "sync" + + "github.com/nxtcoder17/fwatcher/pkg/executor" +) + +func (f *Watcher) WatchAndExecute(ctx context.Context, executors []executor.Executor) error { + var wg sync.WaitGroup + + l := len(executors) + + for i := 0; i < l-1; i++ { + ex := executors[i] + + go func() { + <-ctx.Done() + ex.Stop() + }() + + switch ex.(type) { + case *executor.SSEExectuor: + { + wg.Add(1) + go func() { + defer wg.Done() + ex.Start() + }() + } + default: + { + if err := ex.Start(); err != nil { + return err + } + + // INFO: just for cleanup purposes + if err := ex.Stop(); err != nil { + return err + } + } + } + } + + ex := executors[l-1] + + wg.Add(1) + go func() { + defer wg.Done() + if err := ex.Start(); err != nil { + f.Logger.Error("got", "err", err) + } + f.Logger.Debug("final executor start finished") + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + ex.Stop() + f.Logger.Debug("2. context cancelled") + }() + + wg.Add(1) + go func() { + defer wg.Done() + f.Watch(ctx) + f.Logger.Debug("3. watcher closed") + }() + + counter := 0 + for event := range f.GetEvents() { + f.Logger.Debug("received", "event", event) + counter += 1 + + f.Logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, event.Name)) + + for i := range executors { + executors[i].OnWatchEvent(executor.Event{Source: event.Name}) + } + } + + wg.Wait() + + return nil +} diff --git a/pkg/watcher/lib_test.go b/pkg/watcher/lib_test.go new file mode 100644 index 0000000..10ab6dd --- /dev/null +++ b/pkg/watcher/lib_test.go @@ -0,0 +1,465 @@ +package watcher + +import ( + "bytes" + "context" + "io" + "log/slog" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/nxtcoder17/fwatcher/pkg/executor" +) + +func Test_Watcher_WatchAndExecute(t *testing.T) { + logLevel := slog.LevelInfo + if os.Getenv("DEBUG") == "true" { + logLevel = slog.LevelDebug + } + + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: logLevel, + })) + + slog.SetDefault(logger) + + newCmd := func(stdout io.Writer, cmd string, args ...string) func(c context.Context) *exec.Cmd { + return func(c context.Context) *exec.Cmd { + cmd := exec.CommandContext(c, cmd, args...) + cmd.Stdout = stdout + return cmd + } + } + + tests := []struct { + name string + sendEvents func(ch chan Event) + executors func(ctx context.Context, stdout io.Writer) []executor.Executor + want []string + }{ + { + name: "1. single executor, single command", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + }, + }, + + { + name: "2. single executor, multiple commands", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + }, + }, + + { + name: "3. multiple executor, single command each", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + }, + }, + + { + name: "4. multiple executor, multiple commands each", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "no hi"), + newCmd(stdout, "echo", "no hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + "no hi", + "no hello", + }, + }, + + { + name: "5. single executor, single command, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + // this one after first event + "hi", + }, + }, + + { + name: "6. single executor, single command, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(40 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + + // this one after first event + "hi", + + // this one after second event + "hi", + }, + }, + + { + name: "7. single executor, multiple commands, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + }, + }, + + { + name: "8. single executor, multiple commands, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + + // after second change event + "hi", + "hello", + }, + }, + + { + name: "9. multiple executor, single command, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + // after first change event + "hi", + }, + }, + + { + name: "10. multiple executor, single command, multiple change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + + // after first change event + "hi", + + // after second change event + "hi", + }, + }, + + { + name: "11. multiple executor, multiple commands, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + + // after second change event + "hi", + "hello", + }, + }, + + { + name: "12. multiple executor, multiple commands, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + }, + }, + + { + name: "13. multiple executor with SSE, multiple commands, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewSSEExecutor(executor.SSEExecutorArgs{ + Addr: ":8919", + Logger: logger, + }), + + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + eventCh := make(chan Event) + go tt.sendEvents(eventCh) + + b := new(bytes.Buffer) + + w, _ := fsnotify.NewWatcher() + + watcher := Watcher{ + watcher: w, + Logger: logger, + cooldownDuration: 5 * time.Millisecond, + eventsCh: eventCh, + } + + ctx, cf := context.WithTimeout(context.TODO(), 100*time.Millisecond) + // ctx, cf := context.WithCancel(context.TODO()) + defer cf() + + executors := tt.executors(ctx, b) + + if err := watcher.WatchAndExecute(ctx, executors); err != nil { + t.Error(err) + } + + want := strings.Join(tt.want, "\n") + got := strings.TrimSpace(b.String()) + + if got != want { + t.Errorf("FAILED (%s)\n\t got: %s\n\twant: %s\n", tt.name, got, want) + } + }) + } +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 7ddce22..7929da3 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -12,14 +12,7 @@ import ( "github.com/fsnotify/fsnotify" ) -type Watcher interface { - Close() error - RecursiveAdd(dir ...string) error - Watch(ctx context.Context) - GetEvents() chan Event -} - -type fsnWatcher struct { +type Watcher struct { watcher *fsnotify.Watcher directoryCount int @@ -27,16 +20,19 @@ type fsnWatcher struct { Logger *slog.Logger OnlySuffixes []string IgnoreSuffixes []string - ExcludeDirs map[string]struct{} - watchingDirs map[string]struct{} + + ExcludeDirs map[string]struct{} + watchingDirs map[string]struct{} cooldownDuration time.Duration eventsCh chan Event + + shouldLogWatchEvents bool } // GetEvents implements Watcher. -func (f *fsnWatcher) GetEvents() chan Event { +func (f *Watcher) GetEvents() chan Event { return f.eventsCh } @@ -50,7 +46,7 @@ var ( Chmod = fsnotify.Chmod ) -func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason string) { +func (f Watcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason string) { // INFO: any file change emits a chain of events, but // we can always expect a Write event out of that event chain if event.Op != fsnotify.Write { @@ -79,7 +75,6 @@ func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason strin for _, suffix := range f.IgnoreSuffixes { if strings.HasSuffix(event.Name, suffix) { - f.Logger.Debug("file is ignored", "file", event.Name) return true, fmt.Sprintf("because, file has suffix (%s), which is in ignore suffixes array(%+v)", suffix, f.IgnoreSuffixes) } } @@ -90,7 +85,6 @@ func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason strin matched := false for _, suffix := range f.OnlySuffixes { - f.Logger.Debug(fmt.Sprintf("[only-suffix] suffix: (%s), event.name: %s", suffix, event.Name)) if strings.HasSuffix(event.Name, suffix) { matched = true break @@ -103,7 +97,7 @@ func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason strin return true, "event ignored as suffix is not present in only-watch-suffixes" } -func (f *fsnWatcher) Watch(ctx context.Context) { +func (f *Watcher) Watch(ctx context.Context) { lastProcessingTime := time.Now() for { @@ -133,27 +127,39 @@ func (f *fsnWatcher) Watch(ctx context.Context) { } t := time.Now() - f.Logger.Debug(fmt.Sprintf("event %+v received", event)) + if f.shouldLogWatchEvents { + f.Logger.Debug(fmt.Sprintf("event %+v received", event)) + } if ignore, reason := f.ignoreEvent(event); ignore { - f.Logger.Debug("IGNORING", "event.name", event.Name, "reason", reason) + if f.shouldLogWatchEvents { + f.Logger.Debug("IGNORING", "event.name", event.Name, "reason", reason) + } continue } - f.Logger.Debug("PROCESSING", "event.name", event.Name, "event.op", event.Op.String()) + if f.shouldLogWatchEvents { + f.Logger.Debug("PROCESSING", "event.name", event.Name, "event.op", event.Op.String()) + } if time.Since(lastProcessingTime) < f.cooldownDuration { - f.Logger.Debug(fmt.Sprintf("too many events under %s, ignoring...", f.cooldownDuration.String()), "event.name", event.Name) + if f.shouldLogWatchEvents { + f.Logger.Debug(fmt.Sprintf("too many events under %s, ignoring...", f.cooldownDuration.String()), "event.name", event.Name) + } continue } f.eventsCh <- Event(event) - f.Logger.Debug("watch loop completed", "took", fmt.Sprintf("%dms", time.Since(t).Milliseconds())) + if f.shouldLogWatchEvents { + f.Logger.Debug("watch loop completed", "took", fmt.Sprintf("%dms", time.Since(t).Milliseconds())) + } } case <-ctx.Done(): - f.Logger.Debug("watcher is closing", "reason", "context closed") + if f.shouldLogWatchEvents { + f.Logger.Debug("watcher is closing", "reason", "context closed") + } close(f.eventsCh) f.watcher.Close() return @@ -161,7 +167,7 @@ func (f *fsnWatcher) Watch(ctx context.Context) { } } -func (f *fsnWatcher) RecursiveAdd(dirs ...string) error { +func (f *Watcher) RecursiveAdd(dirs ...string) error { for _, dir := range dirs { if _, ok := f.watchingDirs[dir]; ok { continue @@ -181,7 +187,9 @@ func (f *fsnWatcher) RecursiveAdd(dirs ...string) error { } if _, ok := f.ExcludeDirs[filepath.Base(dir)]; ok { - f.Logger.Debug("EXCLUDED from watchlist", "dir", dir) + if f.shouldLogWatchEvents { + f.Logger.Debug("EXCLUDED from watchlist", "dir", dir) + } continue } @@ -206,17 +214,19 @@ func (f *fsnWatcher) RecursiveAdd(dirs ...string) error { return nil } -func (f *fsnWatcher) addToWatchList(dir string) error { +func (f *Watcher) addToWatchList(dir string) error { if err := f.watcher.Add(dir); err != nil { f.Logger.Error("failed to add directory", "dir", dir, "err", err) return err } f.directoryCount++ - f.Logger.Debug("ADDED to watchlist", "dir", dir, "count", f.directoryCount) + if f.shouldLogWatchEvents { + f.Logger.Debug("ADDED to watchlist", "dir", dir, "count", f.directoryCount) + } return nil } -func (f *fsnWatcher) Close() error { +func (f *Watcher) Close() error { return f.watcher.Close() } @@ -232,9 +242,25 @@ type WatcherArgs struct { CooldownDuration *time.Duration Interactive bool + + ShouldLogWatchEvents bool } -func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { +// DefaultIgnoreList is list of directories that are mostly ignored +var DefaultIgnoreList = []string{ + ".git", ".svn", ".hg", // version control + ".idea", ".vscode", // IDEs + ".direnv", // direnv nix + "node_modules", // node + ".DS_Store", // macOS + ".log", // logs +} + +var DefaultIgnoreExtensions = []string{ + ".log", +} + +func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) { if args.Logger == nil { args.Logger = slog.Default() } @@ -249,10 +275,26 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { excludeDirs := map[string]struct{}{} for _, dir := range args.IgnoreDirs { - args.Logger.Debug("EXCLUDED from watching", "dir", dir) + if args.ShouldLogWatchEvents { + args.Logger.Debug("EXCLUDED from watching", "dir", dir) + } excludeDirs[dir] = struct{}{} } + for _, dir := range args.WatchDirs { + if strings.HasPrefix(dir, "-") { + excludeDirs[dir[1:]] = struct{}{} + } + } + + args.IgnoreExtensions = append(args.IgnoreExtensions, DefaultIgnoreExtensions...) + + for _, ext := range args.WatchExtensions { + if strings.HasPrefix(ext, "-") { + excludeDirs[ext[1:]] = struct{}{} + } + } + watcher, err := fsnotify.NewWatcher() if err != nil { args.Logger.Error("failed to create watcher, got", "err", err) @@ -264,7 +306,7 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { args.WatchDirs = append(args.WatchDirs, dir) } - fsw := &fsnWatcher{ + fsw := &Watcher{ watcher: watcher, Logger: args.Logger, ExcludeDirs: excludeDirs, @@ -273,7 +315,8 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { cooldownDuration: cooldown, watchingDirs: make(map[string]struct{}), - eventsCh: make(chan Event), + shouldLogWatchEvents: args.ShouldLogWatchEvents, + eventsCh: make(chan Event), } if err := fsw.RecursiveAdd(args.WatchDirs...); err != nil {