diff --git a/bin/node.io b/bin/node.io index b446ff8..163b3c8 100644 --- a/bin/node.io +++ b/bin/node.io @@ -1,3 +1,3 @@ #!/usr/bin/env node -require('node.io').cli(process.argv.slice(2)); \ No newline at end of file +require('node.io').cli(process.argv.slice(2)); diff --git a/examples/word.js b/examples/word.js deleted file mode 100644 index 279a9f1..0000000 --- a/examples/word.js +++ /dev/null @@ -1,8 +0,0 @@ -var start = require('../').start; - -var wc = require('./word_count').job.extend({benchmark:true,fork:2}, {input:'./lorem.txt'}); - -start(wc, function(err, output) { - //console.log(err); - //console.log(output); -}); diff --git a/lib/node.io/index.js b/lib/node.io/index.js index 1d5183d..ab81935 100644 --- a/lib/node.io/index.js +++ b/lib/node.io/index.js @@ -9,7 +9,7 @@ var processor = require('./processor'), job = require('./job'); exports = module.exports = { - version: '0.1.0h', + version: '0.1.0i', Processor: processor.Processor, JobProto: job.JobProto, //A reference to the underlying Job.prototype JobClass: job.JobClass, //A reference to a new prototype identical to Job.prototype (so Job.prototype isn't modified) diff --git a/lib/node.io/io.js b/lib/node.io/io.js index 7c7af0e..fdb82e5 100644 --- a/lib/node.io/io.js +++ b/lib/node.io/io.js @@ -140,7 +140,8 @@ Job.prototype.initInputStream = function(stream) { lines: [], last_line: '', data: false, - end: false + end: false, + paused: false } var end = function() { @@ -188,13 +189,15 @@ Job.prototype.handleInputStream = function(data) { if (this.input_stream.lines.length > 5000) { this.input_stream.stream.pause(); + this.input_stream.paused = true; } } Job.prototype.takeInputStreamLines = function(start, num, callback, read_id) { var self = this; - - if (this.input_stream.stream.paused && this.input_stream.lines.length <= 1000) { + + if (this.input_stream.paused && this.input_stream.lines.length <= 2000) { + this.input_stream.paused = false; this.input_stream.stream.resume(); } diff --git a/lib/node.io/processor.js b/lib/node.io/processor.js index 749741b..7abd541 100644 --- a/lib/node.io/processor.js +++ b/lib/node.io/processor.js @@ -222,7 +222,7 @@ Processor.prototype.handleMessage = function(data) { this.output(data.job, data.output); break; - case 'pull': + case 'pull': this.jobs[data.job].pull_requests++; this.pullInput(data.job, data.id); break; @@ -382,7 +382,7 @@ Processor.prototype.pullInput = function (job, forWorker) { } var handleInput = function(input) { - + //No input? we're done. if (typeof input === 'undefined' || input === null || input === false) { @@ -414,7 +414,7 @@ Processor.prototype.pullInput = function (job, forWorker) { self.input(job, input, forWorker); } } - + if (pull === 0) { handleInput(false); @@ -424,7 +424,7 @@ Processor.prototype.pullInput = function (job, forWorker) { var offset = j.offset; j.offset += pull; - + var input = j.obj.input(offset, pull, handleInput); if (typeof input !== 'undefined') handleInput(input); } @@ -580,33 +580,31 @@ Processor.prototype.output = function(job, output) { Processor.prototype.instanceSpawn = function(job) { var self = this, j = this.jobs[job]; - + //Spawn up to `options.max` instances var num = j.options.max - j.instances; while(j.input.length && num--) { self.instanceStart(job); } - + if (ready_to_request_input) { self.pullInput(job); - } else if (!isMaster && j.input.length === 0) { + } else if (!isMaster && !j.isComplete && j.input.length === 0) { //We might be done, add another check on the nextTick process.nextTick(function() { - if (j.input.length === 0) { + if (j.input.length === 0 && !j.isComplete) { var output; if (j.output.length) { output = j.output; j.output = []; j.output_buffer = 0; } - + master.send({type:'complete',job:job,id:id,output:output}); - - //console.log('Sending complete '+(x++)); - + self.jobComplete(job); } }); @@ -676,7 +674,7 @@ Processor.prototype.instanceRetry = function(job, input) { Processor.prototype.instanceComplete = function(job, result) { var self = this, j = this.jobs[job]; - + j.instances--; if (typeof result !== 'undefined' && result !== null) { diff --git a/package.json b/package.json index e62f502..a8cea6b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name" : "node.io", "description" : "A distributed data scraping and processing framework for node.js", - "version" : "0.1.0h", + "version" : "0.1.0i", "homepage" : "http://github.com/chriso/node.io", "keywords" : ["data","mapreduce","map","reduce","scraping","html","parsing","parse","scrape","process","processing","data"], "author" : "Chris O'Hara ",