Skip to content

Commit

Permalink
allow highWaterMark to be passed to write streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rclark committed Dec 24, 2014
1 parent f75327f commit a30525f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
4 changes: 2 additions & 2 deletions lib/stream-put.js
Expand Up @@ -39,7 +39,8 @@ function Put(source, options) {
function done(err) { if (err) s.emit('error', err); }
});

stream.Writable.call(this, { objectMode:true });
options.objectMode = true;
stream.Writable.call(this, options);
}

Put.prototype._write = function(obj, encoding, callback) {
Expand Down Expand Up @@ -71,4 +72,3 @@ Put.prototype._write = function(obj, encoding, callback) {
}
});
};

62 changes: 62 additions & 0 deletions test/backpressure.check.js
@@ -0,0 +1,62 @@
var Timedsource = require('./timedsource');
var tilelive = require('..');
var progress = require('progress-stream');
var util = require('util');
var setConcurrency = require('../lib/stream-util').setConcurrency;

var max = 9;
var readSpeed = 100;
var readsPerWrite = 5;
var numReads = 0;
var numWrites = 0;
var numDrains = 0;
var writeHighWater = 1; //2 * setConcurrency();
var startTime = Infinity;

console.log(
'Calculated concurrency: %s - Write stream high water mark: %s - Target write speed: %s/s',
setConcurrency(), writeHighWater, 60 * 1000 / (readSpeed * readsPerWrite)
);

function report() {
var writesPerSecond = Math.floor(numWrites / (Date.now() - startTime) * 1000);
var readsPerSecond = Math.floor(numReads / (Date.now() - startTime) * 1000);
util.print(
util.format(
'\r\033[KReads: %s - Writes: %s - Queued: %s - Drains: %s - Write concurrency: %s - Write speed: %s/s - Read speed: %s/s',
numReads, numWrites, write._writableState.buffer.length, numDrains, write._multiwriting, writesPerSecond, readsPerSecond
)
);
}

var readsource = new Timedsource({ time: readSpeed, maxzoom: max });
var read = tilelive.createReadStream(readsource, { type: 'scanline' });
var readProgress = progress({
objectMode: true,
length: Math.pow(4, max),
time: 1
}, function(prog) {
if (prog.transferred > numReads) {
numReads = prog.transferred;
}
});
readProgress.once('progress', function() {
startTime = Date.now();
});

var writesource = new Timedsource({ time: readSpeed * readsPerWrite, maxzoom: max });
var write = tilelive.createWriteStream(writesource, { highWaterMark: writeHighWater });
write.on('_write', function() {
numWrites++;
writeConcurrency = write._multiwriting;
});
write.on('drain', function() {
numDrains++;
writeConcurrency = write._multiwriting;
});

setInterval(report, 1);
read.pipe(readProgress).pipe(write).on('stop', function() {
console.log('finished');
process.exit(0);
});

0 comments on commit a30525f

Please sign in to comment.