Skip to content

Commit

Permalink
initial cut directory watcher watchService
Browse files Browse the repository at this point in the history
not working yet, work in progress
LDEV-1776
  • Loading branch information
zspitzer committed Apr 4, 2018
1 parent 544886a commit fe19fa7
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 0 deletions.
134 changes: 134 additions & 0 deletions core/src/main/java/resource/context/gateway/DirectoryWatcherJava.cfc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<!---
* Copyright (c) 2014, the Railo Company Ltd. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library. If not, see <http://www.gnu.org/licenses/>.
*
--->
component output="no" extends="DirectoryWatcher" {

variables.logFileName = "DirectoryWatcher";
variables.state="stopped";

public void function init (required string id, required struct config, component listener) output="no" {
var cfcatch = "";
try {
variables.id=arguments.id;
variables.config=arguments.config;
if (len(arguments.listener) eq 0){
cflog (text="init #variables.id# Listener is not a component",
type="Error", file="#variables.logFileName#");
return;
}
cflog (text="init #variables.id# [#GetComponentMetaData(arguments.listener).path#]",
type="information", file="#variables.logFileName#");
variables.listener=arguments.listener;
} catch (any) {
_handleError(cfcatch, "init");
}
}

public void function start() output="no" {
try {
var sleepStep = iif(variables.config.interval lt 500, 'variables.config.interval', de(500));
var i= - 1;
var cfcatch = "" ;
var startTime = getTickCount();
if (not StructKeyExists(variables, "listener")){
setState("stopped");
return;
}
while (variables.state EQ "stopping"){
sleep(10);
}
setState("running");
variables._filter = cleanExtensions(variables.config.extensions);

log text="start Directory[#variables.config.directory#]";

variables.methods = {
ENTRY_CREATE: config.addFunction,
ENTRY_MODIFY: config.changeFunction,
ENTRY_DELETE: config.deleteFunction
};

try {
// check if the directory actually exists see https://luceeserver.atlassian.net/browse/LDEV-1767
if ( not DirectoryExists(variables.config.directory) )
log text="start #variables.id# Directory [#variables.config.directory#] does not exist or is not a directory"
type="Error";
} catch (any){
log text="poll Directory [#variables.config.directory#] DirectoryExists threw #cfcatch.message# #cfcatch.stacktrace#"
type="Error";
setState("stopped");
return;
}
if (not StructKeyExists(variables.config,"recurse"))
variables.config.recurse = false;
} catch (any) {
_handleError(cfcatch, "start");
}
// first execution
i = 0;
variables.watcher = new WatchService(variables.config.directory, variables.config.recurse, variables.methods);

while (variables.state EQ "running"){
// if (startTime eq -77){
startTime = getTickCount();
if (variables.state NEQ "running")
break;
var poll = variables.watcher.poll();
if (!IsNull(poll)){
cflog (text="not null", type="Information", file="#variables.logFileName#");


dump(poll); // work in progess
abort;

var events = poll.pollEvents();
dump(events);
for (var event in events) {
var method = variables.methods[event.type];
variables.listener[method](event.file);
}
} else {
cflog (text="null", type="Information", file="#variables.logFileName#");
}

// large directories can take a while and involve heavy io
var executionTime = getTickCount() - startTime;
var warningTimeout = 1000;
if (structKeyExists(variables.config, "warningTimeout") )
warningTimeout = variables.config.warningTimeout;
//if (warningTimeout gt 0 and executionTime gt warningTimeout)
log text="poll #variables.id# Directory [#variables.config.directory#] took #(executionTime)#ms";
// } else {
log text="next poll in #variables.config.interval#ms #sleepStep#";
// }

startTime = -1; // trigger poll next time

// sleep untill the next run, but cut it into half seconds, so we can stop the gateway
for (var i=sleepStep; i lt variables.config.interval; i=i+sleepStep){
sleep(sleepStep);
if (variables.state neq "running")
break;
}
<!--- some extra sleeping if --->
if (variables.config.interval mod sleepStep and variables.state eq "running")
sleep((variables.config.interval mod sleepStep));
}
variables.watcher.close();
setState("stopped")
}
}
91 changes: 91 additions & 0 deletions core/src/main/java/resource/context/gateway/WatchService.cfc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
component {

public any function init(required String directory, Boolean recursive = false ) {

var file = CreateObject("java", "java.io.File").init(arguments.directory);
if (!file.isDirectory()) {
throw("Directory '#arguments.directory#' does not exist or is not a directory");
}
var dir = CreateObject("java", "java.nio.file.Paths");
var path = dir.get(arguments.directory,[]);


variables.watcher = CreateObject("java", "java.nio.file.FileSystems").getDefault().newWatchService();
variables.events = CreateObject("java", "java.nio.file.StandardWatchEventKinds");

path.register(variables.watcher, [variables.events.ENTRY_MODIFY, variables.events.ENTRY_DELETE, variables.events.ENTRY_CREATE] );
return variables.watcher;
}

public any function poll() {
var _poll = variables.watcher.poll();
dump(_poll);
if (IsNull(_poll))
return [];
else
return handleEvents(_poll);
}

public any function take() {
return variables.watcher.take();
}

public void function close() {
variables.watcher.close();
}

private Array function handleEvents(required Any key) {

var events = []
if (!IsNull(arguments.key)) {
var path = variables.keys.get(arguments.key)
if (!IsNull(path)) {
for (var event in arguments.key.pollEvents()) {
var kind = event.kind()
if (kind neq variables.events.OVERFLOW) {
var relativePath = event.context()
// in the case of ENTRY_CREATE, ENTRY_DELETE, and ENTRY_MODIFY events the context is a Path that is the relative path between the directory registered with the watch service, and the entry that is created, deleted, or modified.
var affectedPath = path.resolve(relativePath)
var file = affectedPath.toFile()

if (variables.recursive && kind eq variables.events.ENTRY_CREATE) {
if (file.isDirectory()) {
register(affectedPath, variables.recursive)
}
}

events.append({type: kind.name(), file: file});
}
}

var valid = arguments.key.reset()
if (!valid) {
variables.keys.remove(arguments.key);
}
}
}

return events;
}

private void function register(required Any path, required Boolean recursive) {

var key = arguments.path.register(variables.watcher, [
variables.events.ENTRY_CREATE,
variables.events.ENTRY_MODIFY,
variables.events.ENTRY_DELETE
]);
variables.keys.put(key, arguments.path);

if (arguments.recursive) {
var files = arguments.path.toFile().listFiles();
for (var file in files) {
if (file.isDirectory()) {
register(file.toPath(), arguments.recursive);
}
}
}

}

}

0 comments on commit fe19fa7

Please sign in to comment.