diff --git a/fen.go b/fen.go index ced39cb8..f0805a1b 100644 --- a/fen.go +++ b/fen.go @@ -21,6 +21,13 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("FEN based watcher not yet supported for fsnotify\n") } +// Recursively watches directories if this file system supports that. +// Must be called before Add/Remove. +// Returns an error on failure. +func (w *Watcher) SetRecursive() error { + return fmt.Error("Not supported") +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { return nil diff --git a/inotify.go b/inotify.go index c56556f4..b470ced6 100644 --- a/inotify.go +++ b/inotify.go @@ -69,6 +69,13 @@ func (w *Watcher) isClosed() bool { } } +// Recursively watches directories if this file system supports that. +// Must be called before Add/Remove. +// Returns an error on failure. +func (w *Watcher) SetRecursive() error { + return fmt.Error("Not supported") +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { if w.isClosed() { diff --git a/integration_test.go b/integration_test.go index 7096344d..0f63f697 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1226,6 +1226,105 @@ func TestRemoveWithClose(t *testing.T) { } } +func TestSetRecursive(t *testing.T) { + watcher := newWatcher(t) + err := watcher.SetRecursive() + expectRecursive := err == nil + isWindows := runtime.GOOS == "windows" + if isWindows != expectRecursive { + t.Fatalf("Only expected SetRecursive to succeed on windows") + } + + // Create directory to watch + testDir := tempMkdir(t) + defer os.RemoveAll(testDir) + + testSubDir := filepath.Join(testDir, "sub") + testSubDirFile := filepath.Join(testDir, "sub/TestFsnotifyFile1.testfile") + + // Receive errors on the error channel on a separate goroutine + go func() { + for err := range watcher.Errors { + t.Fatalf("error received: %s", err) + } + }() + + // Receive events on the event channel on a separate goroutine + eventstream := watcher.Events + var createReceived, deleteReceived counter + done := make(chan bool) + go func() { + for event := range eventstream { + // Only count relevant events + expected := event.Name == filepath.Clean(testDir) || event.Name == filepath.Clean(testSubDir) + if expectRecursive { + expected = expected || event.Name == filepath.Clean(testSubDirFile) + } + if expected { + t.Logf("event received: %s", event) + if event.Op&Create == Create { + createReceived.increment() + } + if event.Op&Remove == Remove { + deleteReceived.increment() + } + } else { + t.Logf("unexpected event received: %s", event) + } + } + done <- true + }() + + addWatch(t, watcher, testDir) + + // Create sub-directory + if err := os.Mkdir(testSubDir, 0777); err != nil { + t.Fatalf("failed to create test sub-directory: %s", err) + } + + // Create a file (Should not see this! we are not watching subdir) + var fs *os.File + fs, err = os.OpenFile(testSubDirFile, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("creating test file failed: %s", err) + } + fs.Sync() + fs.Close() + + time.Sleep(200 * time.Millisecond) + + // Make sure receive deletes for both file and sub-directory + os.RemoveAll(testSubDir) + + // We expect this event to be received almost immediately, but let's wait 500 ms to be sure + time.Sleep(500 * time.Millisecond) + cReceived := createReceived.value() + expectedCReceived := int32(1) + expectedDReceived := int32(1) + if expectRecursive { + expectedCReceived = 2 + expectedDReceived = 2 + } + if cReceived != expectedCReceived { + t.Fatalf("incorrect number of create events received after 500 ms (%d vs %d)", cReceived, expectedCReceived) + } + dReceived := deleteReceived.value() + if dReceived != expectedDReceived { + t.Fatalf("incorrect number of delete events received after 500 ms (%d vs %d)", dReceived, expectedDReceived) + } + + // Try closing the fsnotify instance + t.Log("calling Close()") + watcher.Close() + t.Log("waiting for the event channel to become closed...") + select { + case <-done: + t.Log("event channel closed") + case <-time.After(2 * time.Second): + t.Fatal("event stream was not closed after 2 seconds") + } +} + func testRename(file1, file2 string) error { switch runtime.GOOS { case "windows", "plan9": diff --git a/kqueue.go b/kqueue.go index cc0183e1..59694f7e 100644 --- a/kqueue.go +++ b/kqueue.go @@ -65,6 +65,13 @@ func NewWatcher() (*Watcher, error) { return w, nil } +// Recursively watches directories if this file system supports that. +// Must be called before Add/Remove. +// Returns an error on failure. +func (w *Watcher) SetRecursive() error { + return fmt.Error("Not supported") +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { w.mu.Lock() diff --git a/windows.go b/windows.go index 09436f31..87565d93 100644 --- a/windows.go +++ b/windows.go @@ -19,14 +19,15 @@ import ( // Watcher watches a set of files, delivering events to a channel. type Watcher struct { - Events chan Event - Errors chan error - isClosed bool // Set to true when Close() is first called - mu sync.Mutex // Map access - port syscall.Handle // Handle to completion port - watches watchMap // Map of watches (key: i-number) - input chan *input // Inputs to the reader are sent on this channel - quit chan chan<- error + Events chan Event + Errors chan error + isClosed bool // Set to true when Close() is first called + mu sync.Mutex // Map access + port syscall.Handle // Handle to completion port + watches watchMap // Map of watches (key: i-number) + input chan *input // Inputs to the reader are sent on this channel + quit chan chan<- error + recursive bool } // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. @@ -36,17 +37,29 @@ func NewWatcher() (*Watcher, error) { return nil, os.NewSyscallError("CreateIoCompletionPort", e) } w := &Watcher{ - port: port, - watches: make(watchMap), - input: make(chan *input, 1), - Events: make(chan Event, 50), - Errors: make(chan error), - quit: make(chan chan<- error, 1), + port: port, + watches: make(watchMap), + input: make(chan *input, 1), + Events: make(chan Event, 50), + Errors: make(chan error), + quit: make(chan chan<- error, 1), + recursive: false, } go w.readEvents() return w, nil } +// Recursively watches directories if this file system supports that. +// Must be called before Add/Remove. +// Returns an error on failure. +func (w *Watcher) SetRecursive() error { + if len(w.watches) > 0 { + return fmt.Errorf("Cannot call SetRecursive after watches are set") + } + w.recursive = true + return nil +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { if w.isClosed { @@ -348,7 +361,7 @@ func (w *Watcher) startRead(watch *watch) error { return nil } e := syscall.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0], - uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0) + uint32(unsafe.Sizeof(watch.buf)), w.recursive, mask, nil, &watch.ov, 0) if e != nil { err := os.NewSyscallError("ReadDirectoryChanges", e) if e == syscall.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 {