Permalink
Browse files

Add file watch service

  • Loading branch information...
1 parent 6414ff3 commit ff3aa0688a2eac37cfad920c5b067ec75ec59549 @longkerdandy committed Aug 13, 2011
Binary file not shown.
@@ -14,6 +14,12 @@
public final static String MOVIE_SCAN_PROVIDED_TOPIC = "org/chii2/medialibrary/file/movie/SCAN_PROVIDED";
// Event Topic for image scan
public final static String IMAGE_SCAN_PROVIDED_TOPIC = "org/chii2/medialibrary/file/image/SCAN_PROVIDED";
+ // Event Topic for image watch create
+ public final static String IMAGE_WATCH_CREATE_TOPIC = "org/chii2/medialibrary/file/image/WATCH_CREATE";
+ // Event Topic for image watch modify
+ public final static String IMAGE_WATCH_MODIFY_TOPIC = "org/chii2/medialibrary/file/image/WATCH_MODIFY";
+ // Event Topic for image watch delete
+ public final static String IMAGE_WATCH_DELETE_TOPIC = "org/chii2/medialibrary/file/image/WATCH_DELETE";
// Directory property in the request event
public final static String DIRECTORY_PROPERTY = "directories";
// Filter property in the internal event
@@ -41,6 +41,10 @@
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
+ <groupId>jpathwatch</groupId>
+ <artifactId>jpathwatch</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -1,9 +1,11 @@
package org.chii2.medialibrary.file;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.chii2.medialibrary.api.file.FileService;
-import org.chii2.medialibrary.file.consumer.FileExtensionFilter;
-import org.chii2.medialibrary.file.consumer.FileScanner;
+import org.chii2.medialibrary.file.filter.FileExtensionFilter;
+import org.chii2.medialibrary.file.scanner.FileScanner;
+import org.chii2.medialibrary.file.watcher.FileWatcher;
import org.chii2.util.ConfigUtils;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
@@ -47,6 +49,8 @@
private List<String> imageDirectories = Arrays.asList(System.getProperty("user.home") + "/Pictures");
// Image file extension filter
private List<String> imageExtFilters = Arrays.asList(".jpg", ".jpeg", ".tiff", ".tif", ".png", ".gif", ".bmp");
+ // Image Watcher
+ private FileWatcher imageWatcher;
// Logger
private Logger logger = LoggerFactory.getLogger("org.chii2.medialibrary.file");
@@ -58,7 +62,7 @@ public void init() {
logger.debug("Chii2 Media Library File Service init.");
Dictionary props = null;
try {
- Configuration config = configAdmin.getConfiguration(CONFIG_FILE);
+ Configuration config = this.configAdmin.getConfiguration(CONFIG_FILE);
props = config.getProperties();
} catch (IOException e) {
logger.error("FileService fail to load configuration with exception: {}.", e.getMessage());
@@ -107,15 +111,21 @@ public void init() {
// Init queue
this.queue = new LinkedBlockingQueue<Map<String, Object>>();
- // Start Request Consumer
+ // Start File Scanner
new Thread(new FileScanner(queue, eventAdmin)).start();
+
+ // New Image Watcher
+ this.imageWatcher = new FileWatcher(this.imageDirectories, this.createFilter(this.imageExtFilters), FileService.IMAGE_WATCH_CREATE_TOPIC, FileService.IMAGE_WATCH_MODIFY_TOPIC, FileService.IMAGE_WATCH_DELETE_TOPIC, this.eventAdmin);
+ new Thread(this.imageWatcher).start();
}
/**
* Life Cycle Destroy
*/
@SuppressWarnings("unused")
public void destroy() {
+ // Stop Threads
+ this.imageWatcher.shouldStop = true;
logger.debug("Chii2 Media Library File Service destroy.");
}
@@ -136,7 +146,7 @@ public void handleEvent(Event event) {
try {
this.queue.put(properties);
} catch (InterruptedException e) {
- logger.error("Provider producer has been interrupted with error: {}.", e.getMessage());
+ logger.error("Provider producer has been interrupted with error: {}.", ExceptionUtils.getMessage(e));
}
} else if (FileService.IMAGE_SCAN_REQUEST_TOPIC.equals(event.getTopic())) {
List<File> directoryList = new ArrayList<File>();
@@ -153,7 +163,7 @@ public void handleEvent(Event event) {
try {
this.queue.put(properties);
} catch (InterruptedException e) {
- logger.error("Provider producer has been interrupted with error: {}.", e.getMessage());
+ logger.error("Provider producer has been interrupted with error: {}.", ExceptionUtils.getMessage(e));
}
}
}
@@ -1,4 +1,4 @@
-package org.chii2.medialibrary.file.consumer;
+package org.chii2.medialibrary.file.filter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -16,7 +16,7 @@
// Acceptable File Extensions
private List<String> acceptableExtensions;
// Logger
- private Logger logger = LoggerFactory.getLogger("org.chii2.medialibrary.file");
+ private Logger logger = LoggerFactory.getLogger("org.chii2.medialibrary.file.filter");
public FileExtensionFilter(List<String> acceptableExtensions) {
this.acceptableExtensions = acceptableExtensions;
@@ -1,5 +1,6 @@
-package org.chii2.medialibrary.file.consumer;
+package org.chii2.medialibrary.file.scanner;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.chii2.medialibrary.api.file.FileService;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -20,7 +21,7 @@
// EventAdmin
private EventAdmin eventAdmin;
// Logger
- private Logger logger = LoggerFactory.getLogger("org.chii2.medialibrary.file");
+ private Logger logger = LoggerFactory.getLogger("org.chii2.medialibrary.file.scanner");
/**
* Constructor
@@ -56,7 +57,7 @@ public void run() {
}
}
} catch (SecurityException e) {
- logger.warn("File access failed with exception: {}", e.getMessage());
+ logger.warn("File access failed with exception: {}", ExceptionUtils.getMessage(e));
}
if (files != null && !files.isEmpty()) {
postEvent(files, topic);
@@ -65,7 +66,7 @@ public void run() {
logger.debug("File Scanner stop.");
}
} catch (InterruptedException e) {
- logger.error("Provider consumer has been interrupted with error: {}.", e.getMessage());
+ logger.error("Provider consumer has been interrupted with error: {}.", ExceptionUtils.getMessage(e));
}
}
@@ -0,0 +1,212 @@
+package org.chii2.medialibrary.file.watcher;
+
+import name.pachler.nio.file.*;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.chii2.medialibrary.api.file.FileService;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * FileWatcher based on JPathWatch, File/Directory watch service
+ */
+public class FileWatcher implements Runnable {
+ // Flag
+ public volatile boolean shouldStop = false;
+ // Directory to be watched
+ private List<String> directories;
+ // File Filter
+ private FileFilter filter;
+ // Create Event Topic
+ private String createTopic;
+ // Modify Event Topic
+ private String modifyTopic;
+ // Delete Event Topic
+ private String deleteTopic;
+ // EventAdmin
+ private EventAdmin eventAdmin;
+ // Logger
+ private Logger logger = LoggerFactory.getLogger("org.chii2.medialibrary.file.watcher");
+
+ /**
+ * Constructor
+ *
+ * @param directories Directory to be watched
+ * @param filter File Name Filter
+ * @param createTopic Create Event Topic
+ * @param modifyTopic Modify Event Topic
+ * @param deleteTopic Delete Event Topic
+ * @param eventAdmin EventAdmin
+ */
+ public FileWatcher(List<String> directories, FileFilter filter, String createTopic, String modifyTopic, String deleteTopic, EventAdmin eventAdmin) {
+ this.directories = directories;
+ this.filter = filter;
+ this.createTopic = createTopic;
+ this.modifyTopic = modifyTopic;
+ this.deleteTopic = deleteTopic;
+ this.eventAdmin = eventAdmin;
+ }
+
+ @Override
+ public void run() {
+ // New Watch Service
+ WatchService watchService = FileSystems.getDefault().newWatchService();
+
+ // Mapping Cache
+ Map<WatchKey, Path> mapping = new HashMap<WatchKey, Path>();
+
+ // Add directories to watch
+ for (String directory : this.directories) {
+ // Path
+ Path path = Paths.get(directory);
+ // Key
+ WatchKey key = null;
+ try {
+ key = path.register(watchService, StandardWatchEventKind.ENTRY_CREATE, StandardWatchEventKind.ENTRY_MODIFY, StandardWatchEventKind.ENTRY_DELETE);
+ } catch (UnsupportedOperationException e) {
+ logger.error("Error when register watcher key: {}", ExceptionUtils.getMessage(e));
+ } catch (IOException e) {
+ logger.error("Error when register watcher key: {}", ExceptionUtils.getMessage(e));
+ }
+ // Add to mapping
+ if (key != null) {
+ mapping.put(key, path);
+ }
+ }
+
+
+ // Loop event handling
+ while (!this.shouldStop) {
+ // take() will block until a file has been created/deleted
+ WatchKey signalledKey = null;
+ try {
+ signalledKey = watchService.take();
+ } catch (InterruptedException e) {
+ logger.warn("Watcher thread has been interrupted: {}", ExceptionUtils.getMessage(e));
+ } catch (ClosedWatchServiceException e) {
+ // other thread closed watch service
+ logger.warn("Watcher thread has been closed by other thread: {}", ExceptionUtils.getMessage(e));
+ break;
+ }
+
+ if (signalledKey != null) {
+ // Get Path
+ Path directory = mapping.get(signalledKey);
+
+ // get list of events from key
+ List<WatchEvent<?>> events = signalledKey.pollEvents();
+
+ // VERY IMPORTANT! call reset() AFTER pollEvents() to allow the
+ // key to be reported again by the watch service
+ signalledKey.reset();
+
+ // Loop Event Type
+ for (WatchEvent event : events) {
+ if (event.kind() == StandardWatchEventKind.ENTRY_CREATE) {
+ Path context = (Path) event.context();
+ logger.debug("Receive a Entry Create watch event for: {}", context.toString());
+ // File
+ File file = new File(directory.toString(), context.toString());
+ if (file.exists()) {
+ // Directory
+ if (file.isDirectory()) {
+ // Path
+ Path path = Paths.get(file.getAbsolutePath());
+ // Key
+ WatchKey key = null;
+ try {
+ key = path.register(watchService, StandardWatchEventKind.ENTRY_CREATE, StandardWatchEventKind.ENTRY_MODIFY, StandardWatchEventKind.ENTRY_DELETE);
+ } catch (UnsupportedOperationException e) {
+ logger.error("Error when register watcher key: {}", ExceptionUtils.getMessage(e));
+ } catch (IOException e) {
+ logger.error("Error when register watcher key: {}", ExceptionUtils.getMessage(e));
+ }
+ // Add to mapping
+ if (key != null) {
+ mapping.put(key, path);
+ }
+ }
+ // Filer
+ else if (file.isFile()) {
+ // Name Filter
+ if (this.filter.accept(file)) {
+ // Post Event
+ this.postEvent(file, this.createTopic);
+ }
+ }
+ }
+ } else if (event.kind() == StandardWatchEventKind.ENTRY_MODIFY) {
+ Path context = (Path) event.context();
+ logger.debug("Receive a Entry Modify watch event for: {}", context.toString());
+ // File
+ File file = new File(directory.toString(), context.toString());
+ if (file.exists() && file.isFile()) {
+ this.postEvent(file, this.modifyTopic);
+ }
+ } else if (event.kind() == StandardWatchEventKind.ENTRY_DELETE) {
+ Path context = (Path) event.context();
+ logger.debug("Receive a Entry Delete watch event for: {}", context.toString());
+ // File
+ File file = new File(directory.toString(), context.toString());
+ // Path
+ Path path = Paths.get(file.getAbsolutePath());
+ // Mapping Cache contains path, it's a directory
+ if (mapping.containsValue(path)) {
+ WatchKey cacheKey = this.getKeyByPath(mapping, path);
+ // Canceled
+ cacheKey.cancel();
+ // Remove from mapping cache
+ mapping.remove(cacheKey);
+ }
+ // it's a file
+ else {
+ // Name Filter
+ if (this.filter.accept(file)) {
+ // Post Event
+ this.postEvent(file, this.deleteTopic);
+ }
+ }
+ } else if (event.kind() == StandardWatchEventKind.OVERFLOW) {
+ logger.warn("OVERFLOW: more changes happened than we could retrieve");
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Get WatchKey by Path from Key-Path Mapping Cache
+ *
+ * @param mapping Key-Path Mapping Cache
+ * @param path Path Value
+ * @return WatchKey
+ */
+ private WatchKey getKeyByPath(Map<WatchKey, Path> mapping, Path path) {
+ for (Map.Entry<WatchKey, Path> entry : mapping.entrySet()) {
+ if (entry.getValue().equals(path)) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Send a scanAll event asynchronously
+ *
+ * @param file File discovered
+ * @param topic Event topic
+ */
+ private void postEvent(File file, String topic) {
+ Dictionary<String, Object> properties = new Hashtable<String, Object>();
+ properties.put(FileService.FILE_PROPERTY, file);
+ Event event = new Event(topic, properties);
+ logger.debug("Send a file {} watch event to topic {}.", file.getAbsolutePath(), topic);
+ this.eventAdmin.postEvent(event);
+ }
+}
@@ -4,7 +4,7 @@
import java.util.ArrayList;
import java.util.List;
-import org.chii2.medialibrary.file.consumer.FileExtensionFilter;
+import org.chii2.medialibrary.file.filter.FileExtensionFilter;
import org.testng.annotations.Test;
/**
View
@@ -262,6 +262,13 @@
<version>1.7</version>
</dependency>
+ <!-- Watch Service -->
+ <dependency>
+ <groupId>jpathwatch</groupId>
+ <artifactId>jpathwatch</artifactId>
+ <version>0.94</version>
+ </dependency>
+
<!-- Network: NIO -->
<dependency>
<groupId>com.ning</groupId>
@@ -322,7 +329,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
- <version>2.3.4</version>
+ <version>2.3.5</version>
<extensions>true</extensions>
<configuration>
<instructions>

0 comments on commit ff3aa06

Please sign in to comment.