Skip to content

Commit

Permalink
allow highWaterMark to be passed to write streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rclark committed Dec 24, 2014
1 parent f75327f commit 7e296bf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
4 changes: 2 additions & 2 deletions lib/stream-put.js
Expand Up @@ -39,7 +39,8 @@ function Put(source, options) {
function done(err) { if (err) s.emit('error', err); }
});

stream.Writable.call(this, { objectMode:true });
options.objectMode = true;
stream.Writable.call(this, options);
}

Put.prototype._write = function(obj, encoding, callback) {
Expand Down Expand Up @@ -71,4 +72,3 @@ Put.prototype._write = function(obj, encoding, callback) {
}
});
};

58 changes: 58 additions & 0 deletions test/backpressure.check.js
@@ -0,0 +1,58 @@
var Timedsource = require('./timedsource');
var tilelive = require('..');
var progress = require('progress-stream');
var util = require('util');
var setConcurrency = require('../lib/stream-util').setConcurrency;

var max = 9;
var readSpeed = 100;
var readsPerWrite = 10;
var numReads = 0;
var numWrites = 0;
var numDrains = 0;
var writesQueued = 0;
var writeConcurrency = 0;
var writeHighWater = 2 * setConcurrency();

console.log('Calculated concurrency: %s', setConcurrency());

function report() {
util.print(
util.format(
'\r\033[KReads: %s - Writes: %s - Queued: %s - Drains: %s - Write concurrency: %s',
numReads, numWrites, writesQueued, numDrains, writeConcurrency
)
);
}

var readsource = new Timedsource({ time: readSpeed, maxzoom: max });
var read = tilelive.createReadStream(readsource, { type: 'scanline' });
var readProgress = progress({
objectMode: true,
length: Math.pow(4, max),
time: 1
}, function(prog) {
if (prog.transferred > numReads) {
numReads = prog.transferred;
report();
}
});

var writesource = new Timedsource({ time: readSpeed * readsPerWrite, maxzoom: max });
var write = tilelive.createWriteStream(writesource, { highWaterMark: writeHighWater });
write.on('_write', function() {
numWrites++;
writeConcurrency = write._multiwriting;
writesQueued = write._writableState.buffer.length;
report();
});
write.on('drain', function() {
numDrains++;
writeConcurrency = write._multiwriting;
writesQueued = write._writableState.buffer.length;
report();
});

read.pipe(readProgress).pipe(write).on('stop', function() {
console.log('finished');
});

0 comments on commit 7e296bf

Please sign in to comment.