diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d22fd2f --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/radovskyb/watcher + +go 1.16 diff --git a/watcher.go b/watcher.go index 4da4dfe..450f34d 100644 --- a/watcher.go +++ b/watcher.go @@ -22,6 +22,11 @@ var ( // from previously calling Start and not yet calling Close. ErrWatcherRunning = errors.New("error: watcher is already running") + // ErrWatcherNotRunning occurs when trying to perform a ScanNow + // when the watcher is not running. It will also occur if Close is called + // whilst a ScanNow() is running / pending. + ErrWatcherNotRunning = errors.New("error: watcher is not running") + // ErrWatchedFileDeleted is an error that occurs when a file or folder that was // being watched has been deleted. ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted") @@ -129,6 +134,7 @@ type Watcher struct { ops map[Op]struct{} // Op filtering. ignoreHidden bool // ignore hidden files or not. maxEvents int // max sent events per cycle + scanNow chan chan struct{} // allows requests for immediate synchronous scans } // New creates a new Watcher. @@ -147,6 +153,7 @@ func New() *Watcher { files: make(map[string]os.FileInfo), ignored: make(map[string]struct{}), names: make(map[string]bool), + scanNow: make(chan chan struct{}), } } @@ -308,16 +315,6 @@ func (w *Watcher) listRecursive(name string) (map[string]os.FileInfo, error) { return err } - for _, f := range w.ffh { - err := f(info, path) - if err == ErrSkip { - return nil - } - if err != nil { - return err - } - } - // If path is ignored and it's a directory, skip the directory. If it's // ignored and it's a single file, skip the file. _, ignored := w.ignored[path] @@ -333,6 +330,17 @@ func (w *Watcher) listRecursive(name string) (map[string]os.FileInfo, error) { } return nil } + + for _, f := range w.ffh { + err := f(info, path) + if err == ErrSkip { + return nil + } + if err != nil { + return err + } + } + // Add the path and it's info to the file list. fileList[path] = info return nil @@ -433,7 +441,7 @@ func (w *Watcher) WatchedFiles() map[string]os.FileInfo { w.mu.Lock() defer w.mu.Unlock() - files := make(map[string]os.FileInfo) + files := make(map[string]os.FileInfo, len(w.files)) for k, v := range w.files { files[k] = v } @@ -497,9 +505,12 @@ func (w *Watcher) retrieveFileList() map[string]os.FileInfo { if err != nil { if os.IsNotExist(err) { w.mu.Unlock() - if name == err.(*os.PathError).Path { - w.Error <- ErrWatchedFileDeleted - w.RemoveRecursive(name) + pathErr, ok := err.(*os.PathError) + if ok { + if name == pathErr.Path { + w.Error <- ErrWatchedFileDeleted + w.RemoveRecursive(name) + } } w.mu.Lock() } else { @@ -511,9 +522,12 @@ func (w *Watcher) retrieveFileList() map[string]os.FileInfo { if err != nil { if os.IsNotExist(err) { w.mu.Unlock() - if name == err.(*os.PathError).Path { - w.Error <- ErrWatchedFileDeleted - w.Remove(name) + pathErr, ok := err.(*os.PathError) + if ok { + if name == pathErr.Path { + w.Error <- ErrWatchedFileDeleted + w.Remove(name) + } } w.mu.Lock() } else { @@ -550,6 +564,10 @@ func (w *Watcher) Start(d time.Duration) error { // Unblock w.Wait(). w.wg.Done() + var scanNowRequest chan struct{} + + defer close(w.Closed) + for { // done lets the inner polling cycle loop know when the // current cycle's method has finished executing. @@ -564,6 +582,7 @@ func (w *Watcher) Start(d time.Duration) error { // cancel can be used to cancel the current event polling function. cancel := make(chan struct{}) + defer close(cancel) // Look for events. go func() { @@ -578,8 +597,6 @@ func (w *Watcher) Start(d time.Duration) error { for { select { case <-w.close: - close(cancel) - close(w.Closed) return nil case event := <-evt: if len(w.ops) > 0 { // Filter Ops. @@ -589,7 +606,7 @@ func (w *Watcher) Start(d time.Duration) error { } } numEvents++ - if w.maxEvents > 0 && numEvents > w.maxEvents { + if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents { close(cancel) break inner } @@ -604,8 +621,53 @@ func (w *Watcher) Start(d time.Duration) error { w.files = fileList w.mu.Unlock() + if scanNowRequest != nil { + close(scanNowRequest) + scanNowRequest = nil + } + // Sleep and then continue to the next loop iteration. - time.Sleep(d) + // If a request to do a full scan is received, handle it and then signal to the requester it is complete. + select { + case <-w.close: // break out of wait early if we get a Close + return nil + case scanNowRequest = <-w.scanNow: // sync scan request received + case <-time.After(d): // periodic re-roll time elapsed + } + } +} + +// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files +// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files +// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events +// for the changes have been gathered before continuing, for example, to better process batched updates to groups of +// files. +// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan +// before going back to sleep. +func (w *Watcher) ScanNow() error { + w.mu.Lock() + if !w.running { + w.mu.Unlock() + return ErrWatcherNotRunning + } + w.mu.Unlock() + + scanComplete := make(chan struct{}) + select { + case w.scanNow <- scanComplete: + case <-w.close: + // if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return + // an error + return ErrWatcherNotRunning + } + + select { + case <-w.close: + // if the watcher is closed whilst we're waiting for our scan to complete, return an error + return ErrWatcherNotRunning + case <-scanComplete: + // scan completed ok + return nil } } @@ -633,7 +695,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, evt chan Event, creates[path] = info continue } - if oldInfo.ModTime() != info.ModTime() { + if oldInfo.ModTime() != info.ModTime() || oldInfo.Size() != info.Size() { select { case <-cancel: return @@ -700,6 +762,7 @@ func (w *Watcher) Wait() { } // Close stops a Watcher and unlocks its mutex, then sends a close signal. +// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher. func (w *Watcher) Close() { w.mu.Lock() if !w.running { @@ -711,5 +774,6 @@ func (w *Watcher) Close() { w.names = make(map[string]bool) w.mu.Unlock() // Send a close signal to the Start method. - w.close <- struct{}{} + // Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also. + close(w.close) } diff --git a/watcher_test.go b/watcher_test.go index 4453b99..24bbc07 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -539,6 +539,161 @@ func TestTriggerEvent(t *testing.T) { wg.Wait() } +func TestScanNow(t *testing.T) { + testDir, teardown := setup(t) + defer teardown() + + w := New() + w.FilterOps(Create) + + // Add the testDir to the watchlist. + if err := w.AddRecursive(testDir); err != nil { + t.Fatal(err) + } + + // should not be able to ScanNow() before the watcher is started + if err := w.ScanNow(); err != ErrWatcherNotRunning { + t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one") + } + + testFilePath := filepath.Join(testDir, "test_file1.txt") + done := make(chan struct{}) + go func() { + evt := <-w.Event + if evt.Op == Create && evt.Path == testFilePath { + close(done) + } else { + t.Fatal("unexpected event") + } + }() + + // Start scanning with a very long poll duration + go func() { + if err := w.Start(time.Hour); err != nil { + t.Fatal(err) + } + }() + + w.Wait() + defer w.Close() + + // perform initial scan, which should yield no changes + // this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // wait for a short period just to ensure no unexpected events arrive + select { + case <-time.After(time.Millisecond * 100): + case <-done: + t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed") + } + + // create the test file, we will not receive events due to the 1hr poll duration + if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil { + t.Error(err) + } + + // wait for a short period just to ensure no unexpected events arrive now we've changed a file + select { + case <-time.After(time.Millisecond * 100): + case <-done: + t.Fatal("should not have received an event as a poll duration of 1 hour is used") + } + + // issue a scan now, and we will receive the events while ScanNow() is running. + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // all events should have been received *whilst* ScanNow() was running, so the done channel should already be + // closed + select { + case <-done: + default: + t.Fatal("events from ScanNow() should have been received before ScanNow() returned") + } + + w.Close() + + // issue a scan now after closing, should error + if err := w.ScanNow(); err != ErrWatcherNotRunning { + t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one") + } +} + +func TestSizeOnlyChange(t *testing.T) { + testDir, teardown := setup(t) + defer teardown() + + w := New() + w.FilterOps(Write) + + // Add the testDir to the watchlist. + testFilePath := filepath.Join(testDir, "file.txt") + if err := w.Add(testFilePath); err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + go func() { + evt := <-w.Event + if evt.Op == Write && evt.Path == testFilePath { + close(done) + } else { + t.Fatal("unexpected event") + } + }() + + // Start scanning with a very long poll duration + go func() { + if err := w.Start(time.Hour); err != nil { + t.Fatal(err) + } + }() + + w.Wait() + defer w.Close() + + // perform initial scan, which should yield no changes + // this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // modify the test file, we will not receive events due to the 1hr poll duration + // when modifying, we ensure the mod time does not change. this tests the situation where a file system has to + // be able to detect multiple file changes within its mod time resolution, which on some systems can be 1 or 2 + // seconds. the watcher should detect the change because the size of the file has changed. + stat, err := os.Stat(testFilePath) + if err != nil { + t.Error(err) + } + if err = ioutil.WriteFile(testFilePath, []byte("bigger than before"), 0755); err != nil { + t.Error(err) + } + if err = os.Chtimes(testFilePath, stat.ModTime(), stat.ModTime()); err != nil { + t.Error(err) + } + + // issue a scan now, and we will receive the events while ScanNow() is running. + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // all events should have been received *whilst* ScanNow() was running, but our handler may still be processing + // the event, so we'll wait for a little while + // closed + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Did not detect a size only change (no mod time change)") + } + + w.Close() +} + func TestEventAddFile(t *testing.T) { testDir, teardown := setup(t) defer teardown()