Skip to content

Commit

Permalink
windows: expose support for recursive directory watches
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Apr 20, 2021
1 parent 846b32d commit 2cb9c60
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 15 deletions.
7 changes: 7 additions & 0 deletions fen.go
Expand Up @@ -21,6 +21,13 @@ func NewWatcher() (*Watcher, error) {
return nil, errors.New("FEN based watcher not yet supported for fsnotify\n")
}

// Recursively watches directories if this file system supports that.
// Must be called before Add/Remove.
// Returns an error on failure.
func (w *Watcher) SetRecursive() error {
return fmt.Error("Not supported")
}

// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
return nil
Expand Down
7 changes: 7 additions & 0 deletions inotify.go
Expand Up @@ -69,6 +69,13 @@ func (w *Watcher) isClosed() bool {
}
}

// Recursively watches directories if this file system supports that.
// Must be called before Add/Remove.
// Returns an error on failure.
func (w *Watcher) SetRecursive() error {
return fmt.Error("Not supported")
}

// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
if w.isClosed() {
Expand Down
99 changes: 99 additions & 0 deletions integration_test.go
Expand Up @@ -1226,6 +1226,105 @@ func TestRemoveWithClose(t *testing.T) {
}
}

func TestSetRecursive(t *testing.T) {
watcher := newWatcher(t)
err := watcher.SetRecursive()
expectRecursive := err == nil
isWindows := runtime.GOOS == "windows"
if isWindows != expectRecursive {
t.Fatalf("Only expected SetRecursive to succeed on windows")
}

// Create directory to watch
testDir := tempMkdir(t)
defer os.RemoveAll(testDir)

testSubDir := filepath.Join(testDir, "sub")
testSubDirFile := filepath.Join(testDir, "sub/TestFsnotifyFile1.testfile")

// Receive errors on the error channel on a separate goroutine
go func() {
for err := range watcher.Errors {
t.Fatalf("error received: %s", err)
}
}()

// Receive events on the event channel on a separate goroutine
eventstream := watcher.Events
var createReceived, deleteReceived counter
done := make(chan bool)
go func() {
for event := range eventstream {
// Only count relevant events
expected := event.Name == filepath.Clean(testDir) || event.Name == filepath.Clean(testSubDir)
if expectRecursive {
expected = expected || event.Name == filepath.Clean(testSubDirFile)
}
if expected {
t.Logf("event received: %s", event)
if event.Op&Create == Create {
createReceived.increment()
}
if event.Op&Remove == Remove {
deleteReceived.increment()
}
} else {
t.Logf("unexpected event received: %s", event)
}
}
done <- true
}()

addWatch(t, watcher, testDir)

// Create sub-directory
if err := os.Mkdir(testSubDir, 0777); err != nil {
t.Fatalf("failed to create test sub-directory: %s", err)
}

// Create a file (Should not see this! we are not watching subdir)
var fs *os.File
fs, err = os.OpenFile(testSubDirFile, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
t.Fatalf("creating test file failed: %s", err)
}
fs.Sync()
fs.Close()

time.Sleep(200 * time.Millisecond)

// Make sure receive deletes for both file and sub-directory
os.RemoveAll(testSubDir)

// We expect this event to be received almost immediately, but let's wait 500 ms to be sure
time.Sleep(500 * time.Millisecond)
cReceived := createReceived.value()
expectedCReceived := int32(1)
expectedDReceived := int32(1)
if expectRecursive {
expectedCReceived = 2
expectedDReceived = 2
}
if cReceived != expectedCReceived {
t.Fatalf("incorrect number of create events received after 500 ms (%d vs %d)", cReceived, expectedCReceived)
}
dReceived := deleteReceived.value()
if dReceived != expectedDReceived {
t.Fatalf("incorrect number of delete events received after 500 ms (%d vs %d)", dReceived, expectedDReceived)
}

// Try closing the fsnotify instance
t.Log("calling Close()")
watcher.Close()
t.Log("waiting for the event channel to become closed...")
select {
case <-done:
t.Log("event channel closed")
case <-time.After(2 * time.Second):
t.Fatal("event stream was not closed after 2 seconds")
}
}

func testRename(file1, file2 string) error {
switch runtime.GOOS {
case "windows", "plan9":
Expand Down
7 changes: 7 additions & 0 deletions kqueue.go
Expand Up @@ -65,6 +65,13 @@ func NewWatcher() (*Watcher, error) {
return w, nil
}

// Recursively watches directories if this file system supports that.
// Must be called before Add/Remove.
// Returns an error on failure.
func (w *Watcher) SetRecursive() error {
return fmt.Error("Not supported")
}

// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
w.mu.Lock()
Expand Down
43 changes: 28 additions & 15 deletions windows.go
Expand Up @@ -19,14 +19,15 @@ import (

// Watcher watches a set of files, delivering events to a channel.
type Watcher struct {
Events chan Event
Errors chan error
isClosed bool // Set to true when Close() is first called
mu sync.Mutex // Map access
port syscall.Handle // Handle to completion port
watches watchMap // Map of watches (key: i-number)
input chan *input // Inputs to the reader are sent on this channel
quit chan chan<- error
Events chan Event
Errors chan error
isClosed bool // Set to true when Close() is first called
mu sync.Mutex // Map access
port syscall.Handle // Handle to completion port
watches watchMap // Map of watches (key: i-number)
input chan *input // Inputs to the reader are sent on this channel
quit chan chan<- error
recursive bool
}

// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
Expand All @@ -36,17 +37,29 @@ func NewWatcher() (*Watcher, error) {
return nil, os.NewSyscallError("CreateIoCompletionPort", e)
}
w := &Watcher{
port: port,
watches: make(watchMap),
input: make(chan *input, 1),
Events: make(chan Event, 50),
Errors: make(chan error),
quit: make(chan chan<- error, 1),
port: port,
watches: make(watchMap),
input: make(chan *input, 1),
Events: make(chan Event, 50),
Errors: make(chan error),
quit: make(chan chan<- error, 1),
recursive: false,
}
go w.readEvents()
return w, nil
}

// Recursively watches directories if this file system supports that.
// Must be called before Add/Remove.
// Returns an error on failure.
func (w *Watcher) SetRecursive() error {
if len(w.watches) > 0 {
return fmt.Errorf("Cannot call SetRecursive after watches are set")
}
w.recursive = true
return nil
}

// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
if w.isClosed {
Expand Down Expand Up @@ -348,7 +361,7 @@ func (w *Watcher) startRead(watch *watch) error {
return nil
}
e := syscall.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0],
uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0)
uint32(unsafe.Sizeof(watch.buf)), w.recursive, mask, nil, &watch.ov, 0)
if e != nil {
err := os.NewSyscallError("ReadDirectoryChanges", e)
if e == syscall.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 {
Expand Down

0 comments on commit 2cb9c60

Please sign in to comment.