Skip to content

Commit

Permalink
Fixed stream.paused being removed from node + version bump
Browse files Browse the repository at this point in the history
  • Loading branch information
chriso committed Nov 18, 2010
1 parent 8bf2ae4 commit dca08e0
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 27 deletions.
2 changes: 1 addition & 1 deletion bin/node.io
@@ -1,3 +1,3 @@
#!/usr/bin/env node #!/usr/bin/env node


require('node.io').cli(process.argv.slice(2)); require('node.io').cli(process.argv.slice(2));
8 changes: 0 additions & 8 deletions examples/word.js

This file was deleted.

2 changes: 1 addition & 1 deletion lib/node.io/index.js
Expand Up @@ -9,7 +9,7 @@ var processor = require('./processor'),
job = require('./job'); job = require('./job');


exports = module.exports = { exports = module.exports = {
version: '0.1.0h', version: '0.1.0i',
Processor: processor.Processor, Processor: processor.Processor,
JobProto: job.JobProto, //A reference to the underlying Job.prototype JobProto: job.JobProto, //A reference to the underlying Job.prototype
JobClass: job.JobClass, //A reference to a new prototype identical to Job.prototype (so Job.prototype isn't modified) JobClass: job.JobClass, //A reference to a new prototype identical to Job.prototype (so Job.prototype isn't modified)
Expand Down
9 changes: 6 additions & 3 deletions lib/node.io/io.js
Expand Up @@ -140,7 +140,8 @@ Job.prototype.initInputStream = function(stream) {
lines: [], lines: [],
last_line: '', last_line: '',
data: false, data: false,
end: false end: false,
paused: false
} }


var end = function() { var end = function() {
Expand Down Expand Up @@ -188,13 +189,15 @@ Job.prototype.handleInputStream = function(data) {


if (this.input_stream.lines.length > 5000) { if (this.input_stream.lines.length > 5000) {
this.input_stream.stream.pause(); this.input_stream.stream.pause();
this.input_stream.paused = true;
} }
} }


Job.prototype.takeInputStreamLines = function(start, num, callback, read_id) { Job.prototype.takeInputStreamLines = function(start, num, callback, read_id) {
var self = this; var self = this;


if (this.input_stream.stream.paused && this.input_stream.lines.length <= 1000) { if (this.input_stream.paused && this.input_stream.lines.length <= 2000) {
this.input_stream.paused = false;
this.input_stream.stream.resume(); this.input_stream.stream.resume();
} }


Expand Down
24 changes: 11 additions & 13 deletions lib/node.io/processor.js
Expand Up @@ -222,7 +222,7 @@ Processor.prototype.handleMessage = function(data) {
this.output(data.job, data.output); this.output(data.job, data.output);
break; break;


case 'pull': case 'pull':
this.jobs[data.job].pull_requests++; this.jobs[data.job].pull_requests++;
this.pullInput(data.job, data.id); this.pullInput(data.job, data.id);
break; break;
Expand Down Expand Up @@ -382,7 +382,7 @@ Processor.prototype.pullInput = function (job, forWorker) {
} }


var handleInput = function(input) { var handleInput = function(input) {

//No input? we're done. //No input? we're done.
if (typeof input === 'undefined' || input === null || input === false) { if (typeof input === 'undefined' || input === null || input === false) {


Expand Down Expand Up @@ -414,7 +414,7 @@ Processor.prototype.pullInput = function (job, forWorker) {
self.input(job, input, forWorker); self.input(job, input, forWorker);
} }
} }

if (pull === 0) { if (pull === 0) {


handleInput(false); handleInput(false);
Expand All @@ -424,7 +424,7 @@ Processor.prototype.pullInput = function (job, forWorker) {
var offset = j.offset; var offset = j.offset;


j.offset += pull; j.offset += pull;

var input = j.obj.input(offset, pull, handleInput); var input = j.obj.input(offset, pull, handleInput);
if (typeof input !== 'undefined') handleInput(input); if (typeof input !== 'undefined') handleInput(input);
} }
Expand Down Expand Up @@ -580,33 +580,31 @@ Processor.prototype.output = function(job, output) {


Processor.prototype.instanceSpawn = function(job) { Processor.prototype.instanceSpawn = function(job) {
var self = this, j = this.jobs[job]; var self = this, j = this.jobs[job];

//Spawn up to `options.max` instances //Spawn up to `options.max` instances
var num = j.options.max - j.instances; var num = j.options.max - j.instances;
while(j.input.length && num--) { while(j.input.length && num--) {
self.instanceStart(job); self.instanceStart(job);
} }

if (ready_to_request_input) { if (ready_to_request_input) {


self.pullInput(job); self.pullInput(job);


} else if (!isMaster && j.input.length === 0) { } else if (!isMaster && !j.isComplete && j.input.length === 0) {


//We might be done, add another check on the nextTick //We might be done, add another check on the nextTick
process.nextTick(function() { process.nextTick(function() {
if (j.input.length === 0) { if (j.input.length === 0 && !j.isComplete) {
var output; var output;
if (j.output.length) { if (j.output.length) {
output = j.output; output = j.output;
j.output = []; j.output = [];
j.output_buffer = 0; j.output_buffer = 0;
} }

master.send({type:'complete',job:job,id:id,output:output}); master.send({type:'complete',job:job,id:id,output:output});


//console.log('Sending complete '+(x++));

self.jobComplete(job); self.jobComplete(job);
} }
}); });
Expand Down Expand Up @@ -676,7 +674,7 @@ Processor.prototype.instanceRetry = function(job, input) {


Processor.prototype.instanceComplete = function(job, result) { Processor.prototype.instanceComplete = function(job, result) {
var self = this, j = this.jobs[job]; var self = this, j = this.jobs[job];

j.instances--; j.instances--;


if (typeof result !== 'undefined' && result !== null) { if (typeof result !== 'undefined' && result !== null) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{ "name" : "node.io", { "name" : "node.io",
"description" : "A distributed data scraping and processing framework for node.js", "description" : "A distributed data scraping and processing framework for node.js",
"version" : "0.1.0h", "version" : "0.1.0i",
"homepage" : "http://github.com/chriso/node.io", "homepage" : "http://github.com/chriso/node.io",
"keywords" : ["data","mapreduce","map","reduce","scraping","html","parsing","parse","scrape","process","processing","data"], "keywords" : ["data","mapreduce","map","reduce","scraping","html","parsing","parse","scrape","process","processing","data"],
"author" : "Chris O'Hara <cohara87@gmail.com>", "author" : "Chris O'Hara <cohara87@gmail.com>",
Expand Down

0 comments on commit dca08e0

Please sign in to comment.