From 7e296bf3f77feba0eaa06b06b53e8efa5199d868 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Wed, 24 Dec 2014 12:11:11 -0700 Subject: [PATCH] allow highWaterMark to be passed to write streams --- lib/stream-put.js | 4 +-- test/backpressure.check.js | 58 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 test/backpressure.check.js diff --git a/lib/stream-put.js b/lib/stream-put.js index 8b97c80..daac50b 100644 --- a/lib/stream-put.js +++ b/lib/stream-put.js @@ -39,7 +39,8 @@ function Put(source, options) { function done(err) { if (err) s.emit('error', err); } }); - stream.Writable.call(this, { objectMode:true }); + options.objectMode = true; + stream.Writable.call(this, options); } Put.prototype._write = function(obj, encoding, callback) { @@ -71,4 +72,3 @@ Put.prototype._write = function(obj, encoding, callback) { } }); }; - diff --git a/test/backpressure.check.js b/test/backpressure.check.js new file mode 100644 index 0000000..7353540 --- /dev/null +++ b/test/backpressure.check.js @@ -0,0 +1,58 @@ +var Timedsource = require('./timedsource'); +var tilelive = require('..'); +var progress = require('progress-stream'); +var util = require('util'); +var setConcurrency = require('../lib/stream-util').setConcurrency; + +var max = 9; +var readSpeed = 100; +var readsPerWrite = 10; +var numReads = 0; +var numWrites = 0; +var numDrains = 0; +var writesQueued = 0; +var writeConcurrency = 0; +var writeHighWater = 2 * setConcurrency(); + +console.log('Calculated concurrency: %s', setConcurrency()); + +function report() { + util.print( + util.format( + '\r\033[KReads: %s - Writes: %s - Queued: %s - Drains: %s - Write concurrency: %s', + numReads, numWrites, writesQueued, numDrains, writeConcurrency + ) + ); +} + +var readsource = new Timedsource({ time: readSpeed, maxzoom: max }); +var read = tilelive.createReadStream(readsource, { type: 'scanline' }); +var readProgress = progress({ + objectMode: true, + length: Math.pow(4, max), + time: 1 +}, function(prog) { + if (prog.transferred > numReads) { + numReads = prog.transferred; + report(); + } +}); + +var writesource = new Timedsource({ time: readSpeed * readsPerWrite, maxzoom: max }); +var write = tilelive.createWriteStream(writesource, { highWaterMark: writeHighWater }); +write.on('_write', function() { + numWrites++; + writeConcurrency = write._multiwriting; + writesQueued = write._writableState.buffer.length; + report(); +}); +write.on('drain', function() { + numDrains++; + writeConcurrency = write._multiwriting; + writesQueued = write._writableState.buffer.length; + report(); +}); + +read.pipe(readProgress).pipe(write).on('stop', function() { + console.log('finished'); +});