Skip to content

Commit

Permalink
Updated and Watcher almost done
Browse files Browse the repository at this point in the history
  • Loading branch information
midstar committed Apr 21, 2019
1 parent 9f3eab6 commit 94c8862
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 134 deletions.
8 changes: 4 additions & 4 deletions media.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Media struct {
autoRotate bool // Rotate JPEG files when needed
box *rice.Box // For icons
thumbGenInProgress bool // True if thumbnail generation in progress
stopWatcherChan chan bool // Set to true to stop the Watcher go-routine
watcher *Watcher // The media watcher
}

// File represents a folder or any other file
Expand Down Expand Up @@ -62,14 +62,14 @@ func createMedia(box *rice.Box, mediaPath string, thumbPath string, enableThumbC
enableThumbCache: enableThumbCache,
autoRotate: autoRotate,
box: box,
thumbGenInProgress: false,
stopWatcherChan: make(chan bool)}
thumbGenInProgress: false}
llog.Info("Video thumbnails supported (ffmpeg installed): %v", media.videoThumbnailSupport())
if enableThumbCache && genThumbsOnStartup {
go media.generateAllThumbnails()
}
if enableThumbCache && startWatcher {
go media.startWatcher()
media.watcher = createWatcher(media)
go media.watcher.startWatcher()
}
return media
}
Expand Down
45 changes: 24 additions & 21 deletions updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type Updater struct {
directories map[string]time.Time // Key: Path, Value: Last update
mutex sync.Mutex // For thread safety
minTimeSinceChangeSec int // Minimum time since change of directory before update
exit bool // Flag to indicate that updater shall exit
stopUpdaterChan chan bool // Set to true to stop the updater go-routine
done chan bool // Set to true when updater go-routine has stopped
media mediaInterface // To run the actual thumbnail update
}

Expand All @@ -33,26 +34,25 @@ func createUpdater(media mediaInterface) *Updater {
directories: make(map[string]time.Time),
mutex: sync.Mutex{},
minTimeSinceChangeSec: 5, // Five seconds
exit: false,
media: media}
stopUpdaterChan: make(chan bool),
done: make(chan bool),
media: media}
}

func (u *Updater) startUpdater() {
llog.Info("Starting updater")
go u.updaterThread()
}

func (u *Updater) stopUpdater() {
u.mutex.Lock()
defer u.mutex.Unlock()

u.exit = true
func (u *Updater) stopUpdater() chan bool {
u.stopUpdaterChan <- true
return u.done
}

func (u *Updater) exitIsSet() bool {
u.mutex.Lock()
defer u.mutex.Unlock()

return u.exit
// stopUpdaterAndWait similar to stopUpdater but waits
// for updater go-routine to stop
func (u *Updater) stopUpdaterAndWait() {
<-u.stopUpdater()
}

// markDirectoryAsUpdated adds the directory for update if
Expand Down Expand Up @@ -110,14 +110,17 @@ func (u *Updater) nextDirectoryToUpdate() (string, bool) {

func (u *Updater) updaterThread() {
for {
time.Sleep(1 * time.Second)
if u.exitIsSet() {
break
}
path, ok := u.nextDirectoryToUpdate()
if ok {
u.media.generateThumbnails(path, false)
select {
case <-time.After(1 * time.Second):
path, ok := u.nextDirectoryToUpdate()
if ok {
llog.Info("Updating thumbs in %s", path)
u.media.generateThumbnails(path, false)
}
case <-u.stopUpdaterChan:
llog.Info("Shutting down updater")
u.done <- true
return
}
}
llog.Info("Shutting down updater")
}
3 changes: 1 addition & 2 deletions updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func TestUpdaterThread(t *testing.T) {
time.Sleep(2000 * time.Millisecond)
assertEqualsStr(t, "", "dir1", lastPathGenerated)

u.stopUpdater()
time.Sleep(1500 * time.Millisecond)
u.stopUpdaterAndWait()

}
148 changes: 73 additions & 75 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,67 @@ package main
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/fsnotify/fsnotify"
"github.com/midstar/llog"
)

// Watcher represents the watcher type
type Watcher struct {
media *Media
updater *Updater
stopWatcherChan chan bool // Set to true to stop the watcher go-routine
done chan bool // Set to true when watcher go-routine has stopped
}

func createWatcher(media *Media) *Watcher {
return &Watcher{
media: media,
updater: createUpdater(media),
stopWatcherChan: make(chan bool),
done: make(chan bool)}
}

// stopWatcher stops the media watcher go-routine if it is running.
// It is perfectly ok to call this function even if the watcher is
// not running.
func (m Media) stopWatcher() {
m.stopWatcherChan <- true
// Also stops the updater go-routine.
// Returns the Watcher done channel and the Updater done channel
func (w *Watcher) stopWatcher() (chan bool, chan bool) {
updaterDone := w.updater.stopUpdater()
w.stopWatcherChan <- true
return w.done, updaterDone
}

// stopWatcherAndWait similar to stopUpdater but waits
// for the watcher and updater go-routines to stop
func (w *Watcher) stopWatcherAndWait() {
watcherDone, updaterDone := w.stopWatcher()
<-watcherDone
<-updaterDone
}

// startWatcher identifies all folders within the mediaPath including
// subfolders and starts the folder watcher go routine.
func (m *Media) startWatcher() {
func (w *Watcher) startWatcher() {
llog.Info("Starting media watcher")
w.updater.startUpdater()
watcher, err := fsnotify.NewWatcher()
if err != nil {
llog.Error("Unable to watch for new media since: %s", err)
return
}

go m.mediaWatcher(watcher)
go w.mediaWatcher(watcher)

m.watchFolder(watcher, m.mediaPath) // TODO put back
w.watchFolder(watcher, w.media.mediaPath)
}

// watchFolder with watch the provided folder including its
// sub folders (i.e. recursively).
// The error return value is just for test purposes.
func (m *Media) watchFolder(watcher *fsnotify.Watcher, path string) error {
func (w *Watcher) watchFolder(watcher *fsnotify.Watcher, path string) error {
err := watcher.Add(path)
if err != nil {
llog.Error("Watch folder %s error: %s", path, err)
Expand All @@ -51,7 +78,7 @@ func (m *Media) watchFolder(watcher *fsnotify.Watcher, path string) error {

for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
m.watchFolder(watcher, filepath.Join(path, fileInfo.Name()))
w.watchFolder(watcher, filepath.Join(path, fileInfo.Name()))
}
}
return nil
Expand All @@ -60,92 +87,63 @@ func (m *Media) watchFolder(watcher *fsnotify.Watcher, path string) error {
// mediaWatcher contains the loop that watches the file events.
// Call stopWatcher to exit.
//
// Limitation 1:
// The Write event is fired of several reasons and there might
// be multiple Write events for one operation. Therefore we
// ignore this event. Basically this means that if the media
// is modified we won't detect it.
//
// Limitation 2:
// If a subfolder is created we probably won't detect the first
// file(s) created in that folder, since the create file events
// of the first files will be generated after we have been able
// to watch the event.
//
// Limitation 3:
// If a directory is removed we will not remove the thumbnails
// generated in that directory
func (m *Media) mediaWatcher(watcher *fsnotify.Watcher) {
// Note that we ignore rename and delete events, i.e. there
// is no clean up.
func (w *Watcher) mediaWatcher(watcher *fsnotify.Watcher) {
for {
select {
case event, ok := <-watcher.Events:
if ok {
llog.Debug("Watcher event: %s", event) // TODO change to llog.Debug
path, err := m.getRelativeMediaPath(event.Name)
llog.Debug("Watcher event: %s", event)
path := event.Name
// relativeMediaPath is always the last diretory, never a file
// (because we call getDir)
relativeMediaPath, err := w.media.getRelativeMediaPath(getDir(path))
if err == nil {
if m.isImage(event.Name) || m.isVideo(event.Name) {
if event.Op&fsnotify.Rename == fsnotify.Rename ||
event.Op&fsnotify.Remove == fsnotify.Remove {
// Remove thumbnail if it exist
thumbPath, err := m.thumbnailPath(path)
if err == nil {
llog.Info("Removing thumbnail if it exist: %s", thumbPath)
os.Remove(thumbPath)
}
} else if event.Op&fsnotify.Create == fsnotify.Create {
// Create thumbnail
waitFileReady(event.Name)
m.generateThumbnail(path)
}
} else if event.Op&fsnotify.Create == fsnotify.Create {
// Check if it was a diretory that was created
if _, err := ioutil.ReadDir(event.Name); err == nil {
llog.Info("Watching new folder %s", event.Name)
m.watchFolder(watcher, event.Name)
if event.Op&fsnotify.Create == fsnotify.Create {
if isDir(path) {
// This is an new diretory
// Watch it
w.watchFolder(watcher, path)
}
// Mark the directory as changed so that updater eventually
// will create the thumbnails
w.updater.markDirectoryAsUpdated(relativeMediaPath)
} else if event.Op&fsnotify.Write == fsnotify.Write {
// Tell updater that there is operations performed in the
// directory (i.e. wait for a while before generating the
// thumbnails)
w.updater.touchDirectory(relativeMediaPath)
}
}
}
case err, ok := <-watcher.Errors:
if ok {
llog.Warn("Watcher error: %s", err)
}
case <-m.stopWatcherChan:
case <-w.stopWatcherChan:
llog.Info("Shutting down media watcher")
watcher.Close()
w.done <- true
return
}
}
}

// waitFileReady tries to figure out if the file is ready to process
// by checking for following:
// 1 - File is possible to open (i.e. is not locked by another process)
// 2 - File is not growing (i.e. it is not written by another process)
// Times out after 5 seconds.
func waitFileReady(fileName string) error {
for i := 0; i < 50; i++ {
time.Sleep(100 * time.Millisecond)
file, err := os.Open(fileName)
if err != nil {
// File is probably locked
llog.Info("File %s is locked - test again", fileName)
continue
}
// Check if file is growing
stat, _ := file.Stat()
sizeBefore := stat.Size()
time.Sleep(100 * time.Millisecond)
stat, _ = file.Stat()
sizeAfter := stat.Size()
file.Close()
if sizeBefore == sizeAfter {
// File did not grow. All ok
return nil
}
// isDir return true if the path is a directory
func isDir(path string) bool {
_, err := ioutil.ReadDir(path)
return err == nil
}

// getDir removes the file (if such exist) from a path.
// Always returns path with front slash separator
func getDir(path string) string {
var result string
if isDir(path) {
result = path
} else {
result = filepath.Dir(path)
}
reason := fmt.Sprintf("File '%s' locked for more than 5 seconds", fileName)
llog.Warn(reason)
return fmt.Errorf(reason)
return filepath.ToSlash(result)
}
Loading

0 comments on commit 94c8862

Please sign in to comment.