Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge upstream #112

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/radovskyb/watcher

go 1.16
110 changes: 87 additions & 23 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ var (
// from previously calling Start and not yet calling Close.
ErrWatcherRunning = errors.New("error: watcher is already running")

// ErrWatcherNotRunning occurs when trying to perform a ScanNow
// when the watcher is not running. It will also occur if Close is called
// whilst a ScanNow() is running / pending.
ErrWatcherNotRunning = errors.New("error: watcher is not running")

// ErrWatchedFileDeleted is an error that occurs when a file or folder that was
// being watched has been deleted.
ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted")
Expand Down Expand Up @@ -129,6 +134,7 @@ type Watcher struct {
ops map[Op]struct{} // Op filtering.
ignoreHidden bool // ignore hidden files or not.
maxEvents int // max sent events per cycle
scanNow chan chan struct{} // allows requests for immediate synchronous scans
}

// New creates a new Watcher.
Expand All @@ -147,6 +153,7 @@ func New() *Watcher {
files: make(map[string]os.FileInfo),
ignored: make(map[string]struct{}),
names: make(map[string]bool),
scanNow: make(chan chan struct{}),
}
}

Expand Down Expand Up @@ -308,16 +315,6 @@ func (w *Watcher) listRecursive(name string) (map[string]os.FileInfo, error) {
return err
}

for _, f := range w.ffh {
err := f(info, path)
if err == ErrSkip {
return nil
}
if err != nil {
return err
}
}

// If path is ignored and it's a directory, skip the directory. If it's
// ignored and it's a single file, skip the file.
_, ignored := w.ignored[path]
Expand All @@ -333,6 +330,17 @@ func (w *Watcher) listRecursive(name string) (map[string]os.FileInfo, error) {
}
return nil
}

for _, f := range w.ffh {
err := f(info, path)
if err == ErrSkip {
return nil
}
if err != nil {
return err
}
}

// Add the path and it's info to the file list.
fileList[path] = info
return nil
Expand Down Expand Up @@ -433,7 +441,7 @@ func (w *Watcher) WatchedFiles() map[string]os.FileInfo {
w.mu.Lock()
defer w.mu.Unlock()

files := make(map[string]os.FileInfo)
files := make(map[string]os.FileInfo, len(w.files))
for k, v := range w.files {
files[k] = v
}
Expand Down Expand Up @@ -497,9 +505,12 @@ func (w *Watcher) retrieveFileList() map[string]os.FileInfo {
if err != nil {
if os.IsNotExist(err) {
w.mu.Unlock()
if name == err.(*os.PathError).Path {
w.Error <- ErrWatchedFileDeleted
w.RemoveRecursive(name)
pathErr, ok := err.(*os.PathError)
if ok {
if name == pathErr.Path {
w.Error <- ErrWatchedFileDeleted
w.RemoveRecursive(name)
}
}
w.mu.Lock()
} else {
Expand All @@ -511,9 +522,12 @@ func (w *Watcher) retrieveFileList() map[string]os.FileInfo {
if err != nil {
if os.IsNotExist(err) {
w.mu.Unlock()
if name == err.(*os.PathError).Path {
w.Error <- ErrWatchedFileDeleted
w.Remove(name)
pathErr, ok := err.(*os.PathError)
if ok {
if name == pathErr.Path {
w.Error <- ErrWatchedFileDeleted
w.Remove(name)
}
}
w.mu.Lock()
} else {
Expand Down Expand Up @@ -550,6 +564,10 @@ func (w *Watcher) Start(d time.Duration) error {
// Unblock w.Wait().
w.wg.Done()

var scanNowRequest chan struct{}

defer close(w.Closed)

for {
// done lets the inner polling cycle loop know when the
// current cycle's method has finished executing.
Expand All @@ -564,6 +582,7 @@ func (w *Watcher) Start(d time.Duration) error {

// cancel can be used to cancel the current event polling function.
cancel := make(chan struct{})
defer close(cancel)

// Look for events.
go func() {
Expand All @@ -578,8 +597,6 @@ func (w *Watcher) Start(d time.Duration) error {
for {
select {
case <-w.close:
close(cancel)
close(w.Closed)
return nil
case event := <-evt:
if len(w.ops) > 0 { // Filter Ops.
Expand All @@ -589,7 +606,7 @@ func (w *Watcher) Start(d time.Duration) error {
}
}
numEvents++
if w.maxEvents > 0 && numEvents > w.maxEvents {
if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents {
close(cancel)
break inner
}
Expand All @@ -604,8 +621,53 @@ func (w *Watcher) Start(d time.Duration) error {
w.files = fileList
w.mu.Unlock()

if scanNowRequest != nil {
close(scanNowRequest)
scanNowRequest = nil
}

// Sleep and then continue to the next loop iteration.
time.Sleep(d)
// If a request to do a full scan is received, handle it and then signal to the requester it is complete.
select {
case <-w.close: // break out of wait early if we get a Close
return nil
case scanNowRequest = <-w.scanNow: // sync scan request received
case <-time.After(d): // periodic re-roll time elapsed
}
}
}

// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files
// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files
// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events
// for the changes have been gathered before continuing, for example, to better process batched updates to groups of
// files.
// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan
// before going back to sleep.
func (w *Watcher) ScanNow() error {
w.mu.Lock()
if !w.running {
w.mu.Unlock()
return ErrWatcherNotRunning
}
w.mu.Unlock()

scanComplete := make(chan struct{})
select {
case w.scanNow <- scanComplete:
case <-w.close:
// if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return
// an error
return ErrWatcherNotRunning
}

select {
case <-w.close:
// if the watcher is closed whilst we're waiting for our scan to complete, return an error
return ErrWatcherNotRunning
case <-scanComplete:
// scan completed ok
return nil
}
}

Expand Down Expand Up @@ -633,7 +695,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, evt chan Event,
creates[path] = info
continue
}
if oldInfo.ModTime() != info.ModTime() {
if oldInfo.ModTime() != info.ModTime() || oldInfo.Size() != info.Size() {
select {
case <-cancel:
return
Expand Down Expand Up @@ -700,6 +762,7 @@ func (w *Watcher) Wait() {
}

// Close stops a Watcher and unlocks its mutex, then sends a close signal.
// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher.
func (w *Watcher) Close() {
w.mu.Lock()
if !w.running {
Expand All @@ -711,5 +774,6 @@ func (w *Watcher) Close() {
w.names = make(map[string]bool)
w.mu.Unlock()
// Send a close signal to the Start method.
w.close <- struct{}{}
// Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also.
close(w.close)
}
155 changes: 155 additions & 0 deletions watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,161 @@ func TestTriggerEvent(t *testing.T) {
wg.Wait()
}

func TestScanNow(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()

w := New()
w.FilterOps(Create)

// Add the testDir to the watchlist.
if err := w.AddRecursive(testDir); err != nil {
t.Fatal(err)
}

// should not be able to ScanNow() before the watcher is started
if err := w.ScanNow(); err != ErrWatcherNotRunning {
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
}

testFilePath := filepath.Join(testDir, "test_file1.txt")
done := make(chan struct{})
go func() {
evt := <-w.Event
if evt.Op == Create && evt.Path == testFilePath {
close(done)
} else {
t.Fatal("unexpected event")
}
}()

// Start scanning with a very long poll duration
go func() {
if err := w.Start(time.Hour); err != nil {
t.Fatal(err)
}
}()

w.Wait()
defer w.Close()

// perform initial scan, which should yield no changes
// this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// wait for a short period just to ensure no unexpected events arrive
select {
case <-time.After(time.Millisecond * 100):
case <-done:
t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed")
}

// create the test file, we will not receive events due to the 1hr poll duration
if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil {
t.Error(err)
}

// wait for a short period just to ensure no unexpected events arrive now we've changed a file
select {
case <-time.After(time.Millisecond * 100):
case <-done:
t.Fatal("should not have received an event as a poll duration of 1 hour is used")
}

// issue a scan now, and we will receive the events while ScanNow() is running.
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// all events should have been received *whilst* ScanNow() was running, so the done channel should already be
// closed
select {
case <-done:
default:
t.Fatal("events from ScanNow() should have been received before ScanNow() returned")
}

w.Close()

// issue a scan now after closing, should error
if err := w.ScanNow(); err != ErrWatcherNotRunning {
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
}
}

func TestSizeOnlyChange(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()

w := New()
w.FilterOps(Write)

// Add the testDir to the watchlist.
testFilePath := filepath.Join(testDir, "file.txt")
if err := w.Add(testFilePath); err != nil {
t.Fatal(err)
}

done := make(chan struct{})
go func() {
evt := <-w.Event
if evt.Op == Write && evt.Path == testFilePath {
close(done)
} else {
t.Fatal("unexpected event")
}
}()

// Start scanning with a very long poll duration
go func() {
if err := w.Start(time.Hour); err != nil {
t.Fatal(err)
}
}()

w.Wait()
defer w.Close()

// perform initial scan, which should yield no changes
// this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// modify the test file, we will not receive events due to the 1hr poll duration
// when modifying, we ensure the mod time does not change. this tests the situation where a file system has to
// be able to detect multiple file changes within its mod time resolution, which on some systems can be 1 or 2
// seconds. the watcher should detect the change because the size of the file has changed.
stat, err := os.Stat(testFilePath)
if err != nil {
t.Error(err)
}
if err = ioutil.WriteFile(testFilePath, []byte("bigger than before"), 0755); err != nil {
t.Error(err)
}
if err = os.Chtimes(testFilePath, stat.ModTime(), stat.ModTime()); err != nil {
t.Error(err)
}

// issue a scan now, and we will receive the events while ScanNow() is running.
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// all events should have been received *whilst* ScanNow() was running, but our handler may still be processing
// the event, so we'll wait for a little while
// closed
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Did not detect a size only change (no mod time change)")
}

w.Close()
}

func TestEventAddFile(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()
Expand Down