Permalink
Browse files

Added async complete() and loading jobs from ~/.node_modules

  • Loading branch information...
1 parent ca33629 commit 6af0c2667f82c2b9eee9a4de4a2b4a4518d73e7d @chriso chriso committed Dec 8, 2010
Showing with 72 additions and 60 deletions.
  1. +1 −1 README.md
  2. +2 −2 docs/api.md
  3. +1 −0 examples/word_count.js
  4. +12 −12 lib/node.io/cli.js
  5. +4 −3 lib/node.io/io.js
  6. +38 −30 lib/node.io/process_master.js
  7. +1 −1 lib/node.io/process_worker.js
  8. +12 −10 lib/node.io/processor.js
  9. +1 −1 lib/node.io/spawn.js
View
@@ -48,7 +48,7 @@ To get started, see the [documentation](https://github.com/chriso/node.io/blob/m
Better documentation will be available once I have time to write it.
-*Note: node.io is an _ALPHA_ release. There will no doubt be some bugs and oddities.*
+*Note: node.io is a _BETA_ release. There will no doubt be some bugs and oddities.*
Check [@nodeio](http://twitter.com/nodeio) or [http://node.io/](http://node.io/) for updates.
View
@@ -239,7 +239,7 @@ Example
});
});
-There are also helper methods for setting or adding headers. Call these methods before using get, getHtml, etc.
+There are also helper methods for setting or adding headers. Call these methods before calling `get`, `getHtml`, etc.
this.setHeader(key, value);
this.setUserAgent('Firefox ...');
@@ -251,7 +251,7 @@ There are also methods to make post requests. If `body` is an object, it is enco
this.post(url, body, [headers], callback, [parse])
this.postHtml(url, body, [headers], callback, [parse])
-To make a custom request, use the lower level doRequest() method
+To make a custom request, use the lower level `doRequest` method
this.doRequest(method, url, body, [headers], callback, [parse])
@@ -35,6 +35,7 @@ var methods = {
}
//Now that we have the full list of words, output
this.output(out);
+ return true;
}
};
View
@@ -142,17 +142,16 @@ exports.cli = function (args) {
start_processor(job_path);
} else {
-
- //We apply the command line switches by creating a temporary job that extends the specified job
-
+
+ //We apply the command line switches by creating a temporary job that extends the specified job
+
var is_coffee = false;
var create_temp_job = function (job_path, callback) {
if (job_path) {
- var temp_filename = cwd + '/temp_' + path.basename(job_path, '.js') + '.js',
- this_job = (job_path.indexOf('/') >= 0 ? job_path : './' + job_path)
+ var temp_filename = '/tmp/temp_' + path.basename(job_path, '.js') + '.js';
//If we're not the master, the file has already been created
if (!isMaster) {
@@ -161,16 +160,17 @@ exports.cli = function (args) {
}
var temp_job;
-
+
if (!is_coffee) {
- temp_job = 'var job = require("' + this_job + '").job;\n\n' +
+ temp_job = 'var job = require("' + job_path + '").job;\n\n' +
'exports.job = job.extend({';
} else {
//Compatability with CoffeeScript inheritance
- temp_job = 'var JobClass = require("' + this_job + '")[\'class\'];\n\n';
+
+ temp_job = 'var JobClass = require("' + job_path + '")[\'class\'];\n\n';
temp_job += 'if (typeof JobClass === "undefined") { console.log("Please export @class when using CoffeeScript. See the documentation for more information");process.exit(1);}';
temp_job += 'var __hasProp = Object.prototype.hasOwnProperty, __extends = function(child, parent) {\n';
temp_job += 'for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; }\n';
@@ -185,7 +185,7 @@ exports.cli = function (args) {
} else if (eval) {
//We don't necessarily need a job if the eval switch is used
- var temp_filename = cwd + '/temp_job.js',
+ var temp_filename = '/tmp/temp_job.js',
temp_job = 'var Job = require("node.io").Job;\n\n' +
'exports.job = new Job({';
@@ -230,8 +230,8 @@ exports.cli = function (args) {
temp_job += '});';
//Remove the temp file when the process exits
- utils.removeOnExit(temp_filename);
-
+ //utils.removeOnExit(temp_filename);
+
fs.writeFile(temp_filename, temp_job, function(err) {
if (err) {
exit(err, true);
@@ -254,7 +254,7 @@ exports.cli = function (args) {
is_coffee = true;
var basename = path.basename(job_path, '.coffee');
- var job_compiled = cwd + '/' + basename + '_compiled.js';
+ var job_compiled = '/tmp/' + basename + '_compiled.js';
//If we're not the master, the file is already compiled
if (!isMaster) {
View
@@ -75,6 +75,7 @@ Job.prototype.outputStream = function (stream, name) {
*/
Job.prototype.inputFromFile = function (path) {
this.debug('Reading from ' + path);
+ this.in_file = path;
var stream = fs.createReadStream(path, {bufferSize: this.options.read_buffer});
this.inputStream(stream);
};
@@ -281,13 +282,13 @@ Job.prototype.handleSpecialIO = function () {
//If output is a string, assume it's a file and write to it when ouput is called
if (typeof this.output === 'string') {
- var out_path = this.output;
+ this.out_file = this.output;
- this.debug('Writing to ' + out_path);
+ this.debug('Writing to ' + this.out_file);
//Write output to the file
this.output = function (data) {
- self.write(out_path, data);
+ self.write(this.out_file, data);
};
}
@@ -240,41 +240,49 @@ Processor.prototype.setupMasterEvents = function (job, workers) {
clearTimeout(job.global_timeout);
}
- //Call job.complete() if it's defined
- if (typeof job.obj.complete === 'function') {
- job.obj.complete.call(job.obj);
- }
-
- //Output information about job running time, benchmarks, etc. if
- //the `benchmark` op is set
- var time;
- if (job.options.benchmark) {
- var mb_read, mb_written, MB = 1024 * 1024, now = new Date();
-
- var round = function (val) {
- return Math.round(val * 1000) / 1000;
- };
+ var oncomplete = function () {
+
+ //Output information about job running time, benchmarks, etc. if
+ //the `benchmark` op is set
+ var time;
+ if (job.options.benchmark) {
+ var mb_read, mb_written, MB = 1024 * 1024, now = new Date();
+
+ var round = function (val) {
+ return Math.round(val * 1000) / 1000;
+ };
+
+ time = (now - job.start_time) / 1000;
+ mb_read = round(job.obj.getBytesRead() / MB);
+ mb_written = round(job.obj.getBytesWritten() / MB);
+
+ if (mb_read) {
+ self.status(
+ 'Read ' + mb_read + 'MB (' + round(mb_read / time) + 'MB/s)'
+ , 'ok');
+ }
+
+ if (mb_written) {
+ self.status(
+ 'Wrote ' + mb_written + 'MB (' + round(mb_written / time) + 'MB/s)'
+ , 'ok');
+ }
+ }
- time = (now - job.start_time) / 1000;
- mb_read = round(job.obj.getBytesRead() / MB);
- mb_written = round(job.obj.getBytesWritten() / MB);
+ self.status('Job complete' + (time ? 'd in ' + time + 's' : ''), 'ok');
- if (mb_read) {
- self.status(
- 'Read ' + mb_read + 'MB (' + round(mb_read / time) + 'MB/s)'
- , 'ok');
- }
+ //Wait for output streams to drain, then call oncomplete
+ job.obj.waitForOutputStreamDrains(job.oncomplete);
- if (mb_written) {
- self.status(
- 'Wrote ' + mb_written + 'MB (' + round(mb_written / time) + 'MB/s)'
- , 'ok');
+ };
+
+ //Call job.complete() if it's defined
+ if (typeof job.obj.complete === 'function') {
+ var wait = job.obj.complete.apply(job.obj, [oncomplete]);
+ if (!wait) {
+ oncomplete();
}
}
- self.status('Job complete' + (time ? 'd in ' + time + 's' : ''), 'ok');
-
- //Wait for output streams to drain, then call oncomplete
- job.obj.waitForOutputStreamDrains(job.oncomplete);
});
}
@@ -78,7 +78,7 @@ Processor.prototype.setupWorkerEvents = function (job, master) {
worker.on('process', function () {
if (job.is_complete) return;
- if (job.ready_to_request_input && job.input.length < (job.options.max * job.options.take)) {
+ if (job.ready_to_request_input && job.input.length < (job.options.max * job.options.take * job.options.worker_input_mult)) {
//We already have input, but pull some more for continuity
job.ready_to_request_input = false;
@@ -98,13 +98,15 @@ exports.start = function (job, options, callback, capture_output) {
//Initialise the processor
processor.init(function () {
-
+
//Start the job
processor.startJob(job_name, job_obj, callback);
});
};
+ require.paths.unshift(process.cwd());
+
//Load the job
processor.loadJob(job, function (err, job_name, job_obj) {
if (err) {
@@ -171,7 +173,6 @@ Processor.prototype.init = function (callback) {
//Add message handlers
stream = multi.frameStream(stream, true);
stream.addListener('message', function (data) {
- //console.log('FROM CHILD: '+data);
self.handleWorkerMessage(data);
});
@@ -191,7 +192,6 @@ Processor.prototype.init = function (callback) {
//Add message handlers
stream = multi.frameStream(stream, true);
stream.addListener('message', function (data) {
- //console.log('FROM MASTER: '+data);
self.handleMasterMessage(data);
});
@@ -212,6 +212,7 @@ Processor.prototype.init = function (callback) {
* @api public
*/
Processor.prototype.loadJob = function (job, callback) {
+
if (typeof job === 'object') {
//The job is already loaded. Since we don't have a unique job
@@ -221,25 +222,26 @@ Processor.prototype.loadJob = function (job, callback) {
} else if (typeof job === 'string') {
- //Make it a full path
- if (job.indexOf('/') === -1) {
- job = process.cwd() + '/' + job;
- }
-
if (path.extname(job) !== '.coffee') {
-
+
//Let node determine the extension and load
try {
callback(null, job, require(job).job);
} catch (e) {
+ console.log(e);
callback('Failed to load job "' + job + '"');
}
} else {
+ //Make it a full path
+ if (job.indexOf('/') === -1) {
+ job = process.cwd() + '/' + job;
+ }
+
//Compile the job if it's CoffeeScript
var basename = path.basename(job, '.coffee');
- var compiled_js = process.cwd() + '/' + basename + '_compiled.js';
+ var compiled_js = '/tmp/' + basename + '_compiled.js';
if (isMaster) {
@@ -22,7 +22,7 @@ Job.prototype.spawn = function (args, stdin, callback) {
}
this.debug('Spawning "' + args + '"');
-
+
if (!(args instanceof Array)) {
args = args.split(' ');
}

0 comments on commit 6af0c26

Please sign in to comment.