From a30525f07ce3b9f2b7edaf7961943c5aeae9c9d2 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Wed, 24 Dec 2014 12:11:11 -0700 Subject: [PATCH] allow highWaterMark to be passed to write streams --- lib/stream-put.js | 4 +-- test/backpressure.check.js | 62 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 test/backpressure.check.js diff --git a/lib/stream-put.js b/lib/stream-put.js index 8b97c80..daac50b 100644 --- a/lib/stream-put.js +++ b/lib/stream-put.js @@ -39,7 +39,8 @@ function Put(source, options) { function done(err) { if (err) s.emit('error', err); } }); - stream.Writable.call(this, { objectMode:true }); + options.objectMode = true; + stream.Writable.call(this, options); } Put.prototype._write = function(obj, encoding, callback) { @@ -71,4 +72,3 @@ Put.prototype._write = function(obj, encoding, callback) { } }); }; - diff --git a/test/backpressure.check.js b/test/backpressure.check.js new file mode 100644 index 0000000..1155671 --- /dev/null +++ b/test/backpressure.check.js @@ -0,0 +1,62 @@ +var Timedsource = require('./timedsource'); +var tilelive = require('..'); +var progress = require('progress-stream'); +var util = require('util'); +var setConcurrency = require('../lib/stream-util').setConcurrency; + +var max = 9; +var readSpeed = 100; +var readsPerWrite = 5; +var numReads = 0; +var numWrites = 0; +var numDrains = 0; +var writeHighWater = 1; //2 * setConcurrency(); +var startTime = Infinity; + +console.log( + 'Calculated concurrency: %s - Write stream high water mark: %s - Target write speed: %s/s', + setConcurrency(), writeHighWater, 60 * 1000 / (readSpeed * readsPerWrite) +); + +function report() { + var writesPerSecond = Math.floor(numWrites / (Date.now() - startTime) * 1000); + var readsPerSecond = Math.floor(numReads / (Date.now() - startTime) * 1000); + util.print( + util.format( + '\r\033[KReads: %s - Writes: %s - Queued: %s - Drains: %s - Write concurrency: %s - Write speed: %s/s - Read speed: %s/s', + numReads, numWrites, write._writableState.buffer.length, numDrains, write._multiwriting, writesPerSecond, readsPerSecond + ) + ); +} + +var readsource = new Timedsource({ time: readSpeed, maxzoom: max }); +var read = tilelive.createReadStream(readsource, { type: 'scanline' }); +var readProgress = progress({ + objectMode: true, + length: Math.pow(4, max), + time: 1 +}, function(prog) { + if (prog.transferred > numReads) { + numReads = prog.transferred; + } +}); +readProgress.once('progress', function() { + startTime = Date.now(); +}); + +var writesource = new Timedsource({ time: readSpeed * readsPerWrite, maxzoom: max }); +var write = tilelive.createWriteStream(writesource, { highWaterMark: writeHighWater }); +write.on('_write', function() { + numWrites++; + writeConcurrency = write._multiwriting; +}); +write.on('drain', function() { + numDrains++; + writeConcurrency = write._multiwriting; +}); + +setInterval(report, 1); +read.pipe(readProgress).pipe(write).on('stop', function() { + console.log('finished'); + process.exit(0); +});