Skip to content
Permalink
Browse files

start an FsWatcher

  • Loading branch information...
plouj authored and plouj-oracle committed Feb 22, 2016
1 parent e9858a1 commit 462a5da23abffe5beba8db299acd4d566ffd66a6
@@ -945,6 +945,7 @@ func defaultConfig(myName string) config.Configuration {
defaultFolder = config.NewFolderConfiguration(folderID, locations[locDefFolder])
defaultFolder.Label = "Default Folder (" + folderID + ")"
defaultFolder.RescanIntervalS = 60
defaultFolder.LongRescanIntervalS = 60 * 60 * 3
defaultFolder.MinDiskFreePct = 1
defaultFolder.Devices = []config.FolderDeviceConfiguration{{DeviceID: myID}}
defaultFolder.AutoNormalize = true
@@ -23,6 +23,7 @@ type FolderConfiguration struct {
Type FolderType `xml:"type,attr" json:"type"`
Devices []FolderDeviceConfiguration `xml:"device" json:"devices"`
RescanIntervalS int `xml:"rescanIntervalS,attr" json:"rescanIntervalS"`
LongRescanIntervalS int `xml:"longRescanIntervalS,attr" json:"longRescanIntervalS"`
IgnorePerms bool `xml:"ignorePerms,attr" json:"ignorePerms"`
AutoNormalize bool `xml:"autoNormalize,attr" json:"autoNormalize"`
MinDiskFreePct float64 `xml:"minDiskFreePct" json:"minDiskFreePct"`
@@ -131,6 +132,12 @@ func (f *FolderConfiguration) prepare() {
f.RescanIntervalS = 0
}

if f.LongRescanIntervalS > MaxRescanIntervalS {
f.LongRescanIntervalS = MaxRescanIntervalS
} else if f.LongRescanIntervalS < 0 {
f.LongRescanIntervalS = 0
}

if f.Versioning.Params == nil {
f.Versioning.Params = make(map[string]string)
}
@@ -0,0 +1,28 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

package fswatcher

import (
"os"
"strings"

"github.com/syncthing/syncthing/lib/logger"
)

var facilityName = "fswatcher"

var (
l = logger.DefaultLogger.NewFacility(facilityName, "Filesystem event watcher")
)

func init() {
l.SetDebug(facilityName, strings.Contains(os.Getenv("STTRACE"), facilityName) || os.Getenv("STTRACE") == "all")
}

func shouldDebug() bool {
return l.ShouldDebug(facilityName)
}
@@ -0,0 +1,213 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

package fswatcher

import (
"errors"
"fmt"
"github.com/zillode/notify"
"os"
"path/filepath"
"strings"
"time"

"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/scanner"
)

type FsEvent struct {
path string
}

var Tempnamer scanner.TempNamer

type FsEventsBatch map[string]*FsEvent

type FsWatcher struct {
folderPath string
notifyModelChan chan<- FsEventsBatch
fsEvents FsEventsBatch
fsEventChan <-chan notify.EventInfo
WatchingFs bool
notifyDelay time.Duration
notifyTimer *time.Timer
notifyTimerNeedsReset bool
inProgress map[string]struct{}
}

const (
slowNotifyDelay = time.Duration(60) * time.Second
fastNotifyDelay = time.Duration(500) * time.Millisecond
)

func NewFsWatcher(folderPath string) *FsWatcher {
return &FsWatcher{
folderPath: folderPath,
notifyModelChan: nil,
fsEvents: make(FsEventsBatch),
fsEventChan: nil,
WatchingFs: false,
notifyDelay: fastNotifyDelay,
notifyTimerNeedsReset: false,
inProgress: make(map[string]struct{}),
}
}

func (watcher *FsWatcher) StartWatchingFilesystem() (<-chan FsEventsBatch, error) {
fsEventChan, err := setupNotifications(watcher.folderPath)
if err == nil {
watcher.WatchingFs = true
watcher.fsEventChan = fsEventChan
go watcher.watchFilesystem()
}
notifyModelChan := make(chan FsEventsBatch)
watcher.notifyModelChan = notifyModelChan
return notifyModelChan, err
}

var maxFiles = 512

func setupNotifications(path string) (chan notify.EventInfo, error) {
c := make(chan notify.EventInfo, maxFiles)
if err := notify.Watch(path, c, notify.All); err != nil {
notify.Stop(c)
close(c)
if strings.Contains(err.Error(), "too many open files") ||
strings.Contains(err.Error(), "no space left on device") {
return nil, errors.New("Please increase inotify limits, see http://bit.ly/1PxkdUC for more information.")
}
return nil, fmt.Errorf(
"Failed to install inotify handler for %s. Error: %s",
path, err)
}
l.Debugf("Setup filesystem notification for %s", path)
return c, nil
}

func (watcher *FsWatcher) watchFilesystem() {
watcher.notifyTimer = time.NewTimer(watcher.notifyDelay)
defer watcher.notifyTimer.Stop()
inProgressItemSubscription := events.Default.Subscribe(
events.ItemStarted | events.ItemFinished)
for {
watcher.resetNotifyTimerIfNeeded()
select {
case event, _ := <-watcher.fsEventChan:
watcher.speedUpNotifyTimer()
watcher.storeFsEvent(event)
case <-watcher.notifyTimer.C:
watcher.actOnTimer()
case event := <-inProgressItemSubscription.C():
watcher.updateInProgressSet(event)
}
}
}

func (watcher *FsWatcher) newFsEvent(eventPath string) *FsEvent {
if isSubpath(eventPath, watcher.folderPath) {
path, _ := filepath.Rel(watcher.folderPath, eventPath)
if !shouldIgnore(path) {
return &FsEvent{path}
}
}
return nil
}

func isSubpath(path string, folderPath string) bool {
if len(path) > 1 && os.IsPathSeparator(path[len(path)-1]) {
path = path[0 : len(path)-1]
}
if len(folderPath) > 1 && os.IsPathSeparator(folderPath[len(folderPath)-1]) {
folderPath = folderPath[0 : len(folderPath)-1]
}
return strings.HasPrefix(path, folderPath)
}

func (watcher *FsWatcher) resetNotifyTimerIfNeeded() {
if watcher.notifyTimerNeedsReset {
l.Debugf("Resetting notifyTimer to %#v\n", watcher.notifyDelay)
watcher.notifyTimer.Reset(watcher.notifyDelay)
watcher.notifyTimerNeedsReset = false
}
}

func (watcher *FsWatcher) speedUpNotifyTimer() {
if watcher.notifyDelay != fastNotifyDelay {
watcher.notifyDelay = fastNotifyDelay
l.Debugf("Speeding up notifyTimer to %#v\n", fastNotifyDelay)
watcher.notifyTimerNeedsReset = true
}
}

func (watcher *FsWatcher) slowDownNotifyTimer() {
if watcher.notifyDelay != slowNotifyDelay {
watcher.notifyDelay = slowNotifyDelay
l.Debugf("Slowing down notifyTimer to %#v\n", watcher.notifyDelay)
watcher.notifyTimerNeedsReset = true
}
}

func (watcher *FsWatcher) storeFsEvent(event notify.EventInfo) {
newEvent := watcher.newFsEvent(event.Path())
if newEvent != nil {
if watcher.pathInProgress(newEvent.path) {
l.Debugf("Skipping notification for finished path: %s\n",
newEvent.path)
} else {
watcher.fsEvents[newEvent.path] = newEvent
}
}
}

func (watcher *FsWatcher) actOnTimer() {
watcher.notifyTimerNeedsReset = true
if len(watcher.fsEvents) > 0 {
l.Debugf("Notifying about %d fs events\n", len(watcher.fsEvents))
watcher.notifyModelChan <- watcher.fsEvents
} else {
watcher.slowDownNotifyTimer()
}
watcher.fsEvents = make(FsEventsBatch)
}

func (watcher *FsWatcher) events() []*FsEvent {
list := make([]*FsEvent, 0, len(watcher.fsEvents))
for _, event := range watcher.fsEvents {
list = append(list, event)
}
return list
}

func (watcher *FsWatcher) updateInProgressSet(event events.Event) {
if event.Type == events.ItemStarted {
path := event.Data.(map[string]string)["item"]
watcher.inProgress[path] = struct{}{}
} else if event.Type == events.ItemFinished {
path := event.Data.(map[string]interface{})["item"].(string)
delete(watcher.inProgress, path)
}
}

func shouldIgnore(path string) bool {
return strings.Contains(path, ".syncthing.") &&
strings.HasSuffix(path, ".tmp") ||
scanner.IsIgnoredPath(path, nil) ||
Tempnamer.IsTemporary(path)
}

func (watcher *FsWatcher) pathInProgress(path string) bool {
_, exists := watcher.inProgress[path]
return exists
}

func (batch FsEventsBatch) GetPaths() []string {
var paths []string
for _, event := range batch {
paths = append(paths, event.path)
}
return paths
}
@@ -0,0 +1,73 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

package fswatcher

import (
"testing"
)

type paths struct {
fullSubPath string
folderPath string
expectedSubPath string
}

func TestRelativeSubPath(t *testing.T) {
// TODO: same for windows
pathSets := []paths{
paths{"/home/user/Sync/blah", "/home/user/Sync/", "blah"},
paths{"/home/user/Sync/blah", "/home/user/Sync", "blah"},
paths{"/home/user/Sync/blah/", "/home/user/Sync/", "blah"},
paths{"/home/user/Sync/blah/", "/home/user/Sync", "blah"},
paths{"/home/user/Sync", "/home/user/Sync", "."},
paths{"/home/user/Sync/", "/home/user/Sync", "."},
paths{"/home/user/Sync", "/home/user/Sync/", "."},
paths{"/home/user/Sync/", "/home/user/Sync/", "."},
}
for _, paths := range pathSets {
result := relativePath(paths.fullSubPath, paths.folderPath)
if result != paths.expectedSubPath {
t.Errorf("Given: sub-path: '%s', folder path: '%s';\n got: '%s' expected '%s'",
paths.fullSubPath, paths.folderPath,
result, paths.expectedSubPath)
}
}
}

type subpathTest struct {
folderPath string
subPath string
isSubpath bool
}

func TestIsSubpath(t *testing.T) {
// TODO: same for windows
tests := []subpathTest{
subpathTest{"/home/user/Sync", "/home/user/Sync/blah", true},
subpathTest{"/home/user/Sync/", "/home/user/Sync/blah", true},
subpathTest{"/home/user/Sync/", "/home/user/Sync/", true},
subpathTest{"/home/user/Sync", "/home/user/Sync", true},
subpathTest{"/home/user/Sync", "/home/user/Sync/", true},
subpathTest{"/home/user/Sync/", "/home/user/Sync", true},
subpathTest{"/home/user/Sync/", "/another/path/Sync", false},
subpathTest{"/", "/", true},
subpathTest{"/", "//", true},
subpathTest{"/", "/some/path/blah", true},
}
for _, test := range tests {
result := isSubpath(test.subPath, test.folderPath)
if result != test.isSubpath {
if test.isSubpath {
t.Errorf("'%s' should be a subpath of '%s'\n",
test.subPath, test.folderPath)
} else {
t.Errorf("'%s' should not be a subpath of '%s'\n",
test.subPath, test.folderPath)
}
}
}
}
@@ -19,18 +19,20 @@ type rescanRequest struct {

// bundle all folder scan activity
type folderScanner struct {
interval time.Duration
timer *time.Timer
now chan rescanRequest
delay chan time.Duration
interval time.Duration
longInterval time.Duration
timer *time.Timer
now chan rescanRequest
delay chan time.Duration
}

func newFolderScanner(config config.FolderConfiguration) folderScanner {
return folderScanner{
interval: time.Duration(config.RescanIntervalS) * time.Second,
timer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
now: make(chan rescanRequest),
delay: make(chan time.Duration),
interval: time.Duration(config.RescanIntervalS) * time.Second,
longInterval: time.Duration(config.LongRescanIntervalS) * time.Second,
timer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
now: make(chan rescanRequest),
delay: make(chan time.Duration),
}
}

@@ -45,6 +47,11 @@ func (f *folderScanner) Reschedule() {
f.timer.Reset(interval)
}

func (f *folderScanner) LongReschedule() {
l.Debugln(f, "next rescan in", f.longInterval)
f.timer.Reset(f.longInterval)
}

func (f *folderScanner) Scan(subdirs []string) error {
req := rescanRequest{
subdirs: subdirs,

0 comments on commit 462a5da

Please sign in to comment.
You can’t perform that action at this time.