Browse files

add async queue to make sure too many files dont get opened at once

  • Loading branch information...
1 parent dd9d8c8 commit b87233f48fda50853bc1c272de27542e202c5c6d @lancejpollard committed Nov 16, 2011
View
12 lib/design.io/listener.js
@@ -10,17 +10,25 @@
Listener = (function() {
function Listener(root, callback) {
- var directories, files, path, paths, source, stat, _i, _len;
+ var directories, files, path, paths, self, source, stat, _i, _len;
this.root = root;
this.directories = directories = {};
this.files = files = {};
paths = require('findit').sync(root);
+ self = this;
for (_i = 0, _len = paths.length; _i < _len; _i++) {
source = paths[_i];
stat = File.stat(source);
path = _path.join(root, source.replace(root, ""));
if (!stat.isDirectory()) {
files[path] = stat;
+ try {
+ callback.call(self, File.relativePath(path), {
+ action: "initialize"
+ });
+ } catch (error) {
+ console.log(error.stack);
+ }
} else {
directories[path] = File.entries(path);
}
@@ -85,7 +93,7 @@
try {
return callback.call(this, path, options);
} catch (error) {
- return _console.error(error.message);
+ return console.log(error.stack);
}
};
View
11 lib/design.io/listener/mac.js
@@ -10,12 +10,19 @@
function Mac(pathfinder, callback) {
var command, self;
- Mac.__super__.constructor.call(this, pathfinder);
+ Mac.__super__.constructor.call(this, pathfinder, callback);
self = this;
command = spawn('ruby', ["" + __dirname + "/mac.rb"]);
command.stdout.setEncoding('utf8');
command.stdout.on('data', function(data) {
- return self.changed(data, callback);
+ var path, _i, _len, _results;
+ data = JSON.parse(data);
+ _results = [];
+ for (_i = 0, _len = data.length; _i < _len; _i++) {
+ path = data[_i];
+ _results.push(self.changed(path.slice(0, -1), callback));
+ }
+ return _results;
});
command.stdout.setEncoding('utf8');
command.stderr.on('data', function(data) {
View
4 lib/design.io/process.js
@@ -11,9 +11,7 @@
server = spawn("node", ["" + __dirname + "/server", "--watchfile", command.program.watchfile, "--directory", command.program.directory, "--port", command.program.port]);
- server.stdout.on('data', function(data) {
- return console.log(data.toString().trim());
- });
+ server.stdout.on('data', function(data) {});
server.stderr.on('data', function(data) {
return console.log(data.toString().trim());
View
92 lib/design.io/watcher.js
@@ -1,12 +1,14 @@
(function() {
- var Pathfinder, Shift, Watcher, fs, path, request, uuid;
+ var Pathfinder, Shift, Watcher, async, fs, path, request, uuid;
fs = require('fs');
path = require('path');
uuid = require('node-uuid');
+ async = require('async');
+
Shift = require('shift');
request = require('request');
@@ -102,29 +104,49 @@
}
};
- Watcher.changed = function(path, options) {
- var action, timestamp, watcher, watchers, _i, _len, _results;
- if (options == null) options = {};
+ Watcher.queue = async.queue(function(change, callback) {
+ return Watcher.change(change.path, change.options, callback);
+ }, 1);
+
+ Watcher.change = function(path, options, callback) {
+ var action, iterator, timestamp, watchers;
watchers = this.all();
action = options.action;
timestamp = options.timestamp;
- _results = [];
- for (_i = 0, _len = watchers.length; _i < _len; _i++) {
- watcher = watchers[_i];
+ iterator = function(watcher, next) {
if (watcher.match(path)) {
watcher.path = path;
watcher.action = action;
watcher.timestamp = timestamp;
try {
- _results.push(!!watcher[action](path, options));
+ switch (watcher[action].length) {
+ case 1:
+ throw Error("You must specify a callback in your watcher");
+ break;
+ case 2:
+ return watcher[action].call(watcher, path, next);
+ case 3:
+ return watcher[action].call(watcher, path, options, next);
+ }
} catch (error) {
- _results.push(_console.error(error.toString()));
+ console.log(error.stack);
+ return next();
}
} else {
- _results.push(void 0);
+ return next();
}
- }
- return _results;
+ };
+ return async.forEachSeries(watchers, iterator, function(error) {
+ return process.nextTick(callback);
+ });
+ };
+
+ Watcher.changed = function(path, options) {
+ if (options == null) options = {};
+ return this.queue.push({
+ path: path,
+ options: options
+ });
};
Watcher.log = function(data) {
@@ -144,7 +166,7 @@
try {
_results.push(!!server[action](data));
} catch (error) {
- _results.push(_console.error(error.toString()));
+ _results.push(console.log(error.stack));
}
} else {
_results.push(void 0);
@@ -153,8 +175,9 @@
return _results;
};
- Watcher.broadcast = function(action, data) {
- var params, replacer;
+ Watcher.broadcast = function(action, data, callback) {
+ var params, replacer, self;
+ self = this;
replacer = this.replacer;
params = {
url: "" + this.url + "/design.io/" + action,
@@ -166,12 +189,14 @@
};
return request(params, function(error, response, body) {
if (!error && response.statusCode === 200) {
+ if (callback) callback.call(self, null, response);
return true;
} else {
- if (error) {
- return _console.error(error.toString());
+ error = error ? error.stack : response.body;
+ if (callback) {
+ return callback.call(self, error, null);
} else {
- return _console.error(response.body);
+ return console.log(error);
}
}
});
@@ -197,31 +222,34 @@
if (this.hasOwnProperty("server")) this.server.watcher = this;
}
- Watcher.prototype.create = function(path) {
- return this.update(path);
+ Watcher.prototype.initialize = function(path, callback) {};
+
+ Watcher.prototype.create = function(path, callback) {
+ return this.update(path, callback);
};
- Watcher.prototype.update = function(path) {
+ Watcher.prototype.update = function(path, callback) {
var self;
self = this;
return fs.readFile(path, 'utf-8', function(error, result) {
if (error) return self.error(error);
return self.broadcast({
body: result
- });
+ }, callback);
});
};
- Watcher.prototype.destroy = function() {
- return this.broadcast();
+ Watcher.prototype.destroy = function(path, callback) {
+ return this.broadcast(callback);
};
Watcher.prototype.updateAll = function() {
return Watcher.update();
};
- Watcher.prototype.error = function(error) {
- _console.error(error.hasOwnProperty("message") ? error.message : error.toString());
+ Watcher.prototype.error = function(error, callback) {
+ require('util').puts(error.stack);
+ if (callback) callback();
return false;
};
@@ -237,14 +265,20 @@
};
Watcher.prototype.broadcast = function() {
- var action, args, data;
+ var action, args, callback, data;
args = Array.prototype.slice.call(arguments, 0, arguments.length);
- data = args.pop() || {};
+ callback = args.pop();
+ if (typeof callback === "function") {
+ data = args.pop() || {};
+ } else {
+ data = callback;
+ callback = null;
+ }
data.action || (data.action = this.action);
data.path || (data.path = this.path);
data.id = this.id;
action = args.shift() || "exec";
- return this.constructor.broadcast(action, data);
+ return this.constructor.broadcast(action, data, callback);
};
Watcher.prototype.toJSON = function() {
View
2 package.json
@@ -1,6 +1,6 @@
{
"name": "design.io",
- "version": "0.2.3",
+ "version": "0.2.5",
"description": "Design and Test Your App in Real-Time from TextMate",
"homepage": "http://github.com/viatropos/design.io",
"main": "lib/design.io.js",
View
8 src/design.io/listener.coffee
@@ -8,12 +8,16 @@ class Listener
@directories = directories = {}
@files = files = {}
paths = require('findit').sync(root)
-
+ self = @
for source in paths
stat = File.stat(source)
path = _path.join(root, source.replace(root, ""))
unless stat.isDirectory()
files[path] = stat
+ try
+ callback.call(self, File.relativePath(path), action: "initialize")
+ catch error
+ console.log error.stack
else
directories[path] = File.entries(path)
@@ -68,7 +72,7 @@ class Listener
try
callback.call(@, path, options)
catch error
- _console.error error.message
+ console.log error.stack
require './listener/mac'
require './listener/polling'
View
6 src/design.io/listener/mac.coffee
@@ -3,15 +3,17 @@
class Mac extends (require('../listener'))
constructor: (pathfinder, callback) ->
- super(pathfinder)
+ super(pathfinder, callback)
self = @
command = spawn 'ruby', ["#{__dirname}/mac.rb"]
command.stdout.setEncoding('utf8')
command.stdout.on 'data', (data) ->
+ data = JSON.parse(data)
# console.log(data.toString().trim())
- self.changed(data, callback)
+ for path in data
+ self.changed(path[0..-2], callback)
command.stdout.setEncoding('utf8')
command.stderr.on 'data', (data) ->
_console.error data.toString().trim()
View
2 src/design.io/listener/mac.rb
@@ -5,6 +5,6 @@
io = STDOUT
directory = STDIN.read
fsevent.watch directory do |directories|
- io.write directories[0][0..-2]
+ io.write directories
end
fsevent.run
View
2 src/design.io/process.coffee
@@ -12,6 +12,6 @@ server = spawn "node", [
"--port", command.program.port
]
server.stdout.on 'data', (data) ->
- console.log data.toString().trim()
+ # console.log data.toString().trim()
server.stderr.on 'data', (data) ->
console.log data.toString().trim()
View
77 src/design.io/watcher.coffee
@@ -1,6 +1,7 @@
fs = require 'fs'
path = require 'path'
uuid = require 'node-uuid'
+async = require 'async'
Shift = require 'shift'
request = require 'request'
Pathfinder = require 'pathfinder'
@@ -82,28 +83,44 @@ class Watcher
eval(value)
else
value
-
- @changed: (path, options = {}) ->
+
+ @queue: async.queue((change, callback) ->
+ Watcher.change(change.path, change.options, callback)
+ , 1)
+
+ @change: (path, options, callback) ->
watchers = @all()
action = options.action
timestamp = options.timestamp
- for watcher in watchers
+ iterator = (watcher, next) ->
if watcher.match(path)
watcher.path = path
watcher.action = action
watcher.timestamp = timestamp
try
- !!watcher[action](path, options)
+ switch watcher[action].length
+ when 1 then throw Error("You must specify a callback in your watcher")
+ when 2 then watcher[action].call(watcher, path, next)
+ when 3 then watcher[action].call(watcher, path, options, next)
catch error
- _console.error error.toString()
+ console.log(error.stack)
+ next()
# make async
# delete watcher.path
# delete watcher.action
# delete watcher.timestamp
#break unless success
+ else
+ next()
+
+ async.forEachSeries watchers, iterator, (error) ->
+ process.nextTick(callback)
+
+ @changed: (path, options = {}) ->
+ @queue.push path: path, options: options
@log: (data) ->
watchers = @all()
@@ -122,11 +139,13 @@ class Watcher
try
!!server[action](data)
catch error
- _console.error error.toString()
+ console.log(error.stack)
+
- @broadcast: (action, data) ->
- replacer = @replacer
- params =
+ @broadcast: (action, data, callback) ->
+ self = @
+ replacer = @replacer
+ params =
url: "#{@url}/design.io/#{action}"
method: "POST"
body: JSON.stringify(data, replacer)
@@ -135,13 +154,15 @@ class Watcher
request params, (error, response, body) ->
if !error && response.statusCode == 200
- #console.log(body)
+ callback.call(self, null, response) if callback
true
else
- if error
- _console.error error.toString()
+ error = if error then error.stack else response.body
+
+ if callback
+ callback.call(self, error, null)
else
- _console.error response.body
+ console.log(error)
constructor: ->
args = Array.prototype.slice.call(arguments, 0, arguments.length)
@@ -156,29 +177,33 @@ class Watcher
@id ||= uuid()
@server.watcher = @ if @hasOwnProperty("server")
+
+ initialize: (path, callback) ->
# Example:
#
# create: (path) ->
# ext = RegExp.$1
- create: (path) ->
- @update(path)
+ create: (path, callback) ->
+ @update(path, callback)
- update: (path) ->
+ update: (path, callback) ->
self = @
fs.readFile path, 'utf-8', (error, result) ->
return self.error(error) if error
- self.broadcast body: result
+ self.broadcast body: result, callback
- destroy: ->
- @broadcast()
+ destroy: (path, callback) ->
+ @broadcast(callback)
updateAll: ->
Watcher.update()
- error: (error) ->
- _console.error if error.hasOwnProperty("message") then error.message else error.toString()
+ error: (error, callback) ->
+ #_console.error if error.hasOwnProperty("message") then error.message else error.toString()
+ require('util').puts(error.stack)
+ callback() if callback
false
match: (path) ->
@@ -191,13 +216,17 @@ class Watcher
# send data to all browsers
broadcast: ->
args = Array.prototype.slice.call(arguments, 0, arguments.length)
- data = args.pop() || {}
+ callback = args.pop()
+ if typeof(callback) == "function"
+ data = args.pop() || {}
+ else
+ data = callback
+ callback = null
data.action ||= @action
data.path ||= @path
data.id = @id
action = args.shift() || "exec"
-
- @constructor.broadcast action, data
+ @constructor.broadcast action, data, callback
toJSON: ->
data =

0 comments on commit b87233f

Please sign in to comment.