Skip to content

Commit

Permalink
parallel-stream for puts
Browse files Browse the repository at this point in the history
  • Loading branch information
rclark committed Dec 22, 2014
1 parent f996019 commit 42d3815
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 54 deletions.
101 changes: 53 additions & 48 deletions lib/stream-put.js
Expand Up @@ -4,71 +4,76 @@ var Info = require('./stream-util').Info;
var stream = require('stream');
var util = require('util');
var multiwrite = require('./stream-util').multiwrite;
var setConcurrency = require('./stream-util').setConcurrency;
var Parallel = require('parallel-stream');

module.exports = Put;
util.inherits(Put, stream.Writable);
util.inherits(Put, Parallel);

function Put(source, options) {
if (!source) throw new TypeError('Tilesource required');
this.source = source;
this.stats = new Stats();

// If source has no startWriting function, skip lazy initialization step.
this.startWriting = source.startWriting === undefined;
this._writing = 0;
// No-op if the source doesn't provide start/stop writing functions
this.startWriting = source.startWriting ?
source.startWriting.bind(source) :
function(cb) { return cb(); };
this.stopWriting = source.stopWriting ?
source.stopWriting.bind(source) :
function(cb) { return cb(); };

this.stats = new Stats();
var stream = this;

var s = this;
// Because the writestream uses concurrent/multiwriting emit a special
// 'stop' event which indicates stopWriting and all queued writes have
// completed after the 'finish' event.
this.on('finish', function() {
return s._multiwriting ? s.once('_writeEmpty', stop) : stop();
function stop() { return source.stopWriting ? source.stopWriting(done) : done(); }
function done(err) {
if (err) {
s.emit('error', err);
} else {
s.emit('stop');
}
}
this.on('end', function() {
stream.stopWriting(function(err) {
if (err) return stream.emit('error', err);
stream.emit('stop');
});
});

this.on('unpipe', function() {
return s._multiwriting ? s.once('_writeEmpty', stop) : stop();
function stop() { return source.stopWriting ? source.stopWriting(done) : done(); }
function done(err) { if (err) s.emit('error', err); }
if (stream.writing) {
stream.stopWriting(function(err) {
if (err) stream.emit('error', err);
});
}
});

this.once('startup', function() {
stream.startWriting(function(err) {
if (err) return stream.emit('error', err);
stream.writing = true;
});
});

stream.Writable.call(this, { objectMode:true });
Parallel.call(this, setConcurrency(), { objectMode:true });
}

Put.prototype._write = function(obj, encoding, callback) {
Put.prototype._process = function(obj, encoding, callback) {
var stream = this;

// Lazily call startWriting JIT.
if (!stream.startWriting) return stream.source.startWriting(function(err) {
if (err) return callback(err);
stream.startWriting = true;
stream._write(obj, encoding, callback);
});
if (!stream.writing) {
stream.emit('startup');
return setImmediate(stream._process.bind(stream), obj, encoding, callback);
}

multiwrite(stream, callback, function write(done) {
if (obj instanceof Tile) {
stream.stats.ops++;
return stream.source.putTile(obj.z, obj.x, obj.y, obj.buffer, function(err) {
if (err) return done(err);
stream.stats.done++;
done();
});
}
if (obj instanceof Info) {
stream.stats.ops++;
return stream.source.putInfo(obj, function(err) {
if (err) return done(err);
stream.stats.done++;
done();
});
}
});
};
if (obj instanceof Tile) {
stream.stats.ops++;
return stream.source.putTile(obj.z, obj.x, obj.y, obj.buffer, function(err) {
if (err) return callback(err);
stream.stats.done++;
callback();
});
}

if (obj instanceof Info) {
stream.stats.ops++;
return stream.source.putInfo(obj, function(err) {
if (err) return callback(err);
stream.stats.done++;
callback();
});
}
};
12 changes: 6 additions & 6 deletions lib/tilelive.js
Expand Up @@ -9,6 +9,7 @@ var EventEmitter = require('events').EventEmitter;
var stream = require('stream');
var progress = require('progress-stream');
var queue = require("queue-async");
var Parallel = require('parallel-stream');

global.tileliveProtocols = global.tileliveProtocols || {};

Expand Down Expand Up @@ -364,6 +365,7 @@ tilelive.copy = function(src, dst, options, callback) {
}

put.on('stop', callback);
put.resume();
}

function out(options, callback) {
Expand All @@ -389,19 +391,17 @@ tilelive.copy = function(src, dst, options, callback) {
tilelive.createReadStream = function(source, options) {
options = options || {};
options.type = options.type || 'scanline';
var result = new tilelive.streamTypes[options.type](source, options);
if (!(result instanceof stream.Readable))
if (['list', 'scanline', 'pyramid'].indexOf(options.type) === -1)
throw new Error(options.type + ' is not a valid readable stream type');
return result;
return new tilelive.streamTypes[options.type](source, options);
};

tilelive.createWriteStream = function(source, options) {
options = options || {};
options.type = options.type || 'put';
var result = new tilelive.streamTypes[options.type](source, options);
if (!(result instanceof stream.Writable))
if ([ 'put' ].indexOf(options.type) === -1)
throw new Error(options.type + ' is not a valid writable stream type');
return result;
return new tilelive.streamTypes[options.type](source, options);
};

tilelive.stream = require('./stream-util');
Expand Down
3 changes: 3 additions & 0 deletions test/stream-deserialize.test.js
Expand Up @@ -128,6 +128,7 @@ test('deserialize: put', function(t) {
});

get.pipe(tilelive.serialize()).pipe(tilelive.deserialize()).pipe(put);
put.resume();
});

test('deserialize: verify put', function(t) {
Expand Down Expand Up @@ -176,6 +177,8 @@ test('de/serialize: round-trip', function(t) {
.pipe(deserialize)
.pipe(final)
.on('stop', makeAssertions);

final.resume();
});
});

Expand Down
2 changes: 2 additions & 0 deletions test/stream-list.test.js
Expand Up @@ -40,6 +40,7 @@ test('list: pipe', function(t) {
t.end();
});
file.pipe(get).pipe(put);
put.resume();
});

test('list: no new-line at end of stream', function(t) {
Expand Down Expand Up @@ -108,6 +109,7 @@ test('list: concurrency', function(t) {
t.deepEqual(get.stats, { ops:77, total: 77, skipped: 38, done: 77 });
t.end();
});
put.resume();
});

test('list: split into jobs', function(t) {
Expand Down
2 changes: 2 additions & 0 deletions test/stream-pyramid.test.js
Expand Up @@ -39,6 +39,7 @@ test('pyramid: pipe', function(t) {
t.deepEqual(get.stats, { ops:285, total: 285, skipped: 0, done: 285 });
t.end();
});
put.resume();
});

test('pyramid: vacuum', function(t) {
Expand Down Expand Up @@ -89,6 +90,7 @@ test('pyramid: concurrency', function(t) {
t.deepEqual(get.stats, { ops:45, total: 85, skipped: 42, done: 85 });
t.end();
});
put.resume();
});

test('pyramid: split into jobs', function(t) {
Expand Down
2 changes: 2 additions & 0 deletions test/stream-scanline.test.js
Expand Up @@ -39,6 +39,7 @@ test('scanline: pipe', function(t) {
t.deepEqual(get.stats, { ops:285, total: 285, skipped: 0, done: 285 });
t.end();
});
put.resume();
});

test('scanline: vacuum', function(t) {
Expand Down Expand Up @@ -89,6 +90,7 @@ test('scanline: concurrency', function(t) {
t.deepEqual(get.stats, { ops: 85, total: 85, skipped: 42, done: 85 });
t.end();
});
put.resume();
});

test('scanline: split into jobs', function(t) {
Expand Down
3 changes: 3 additions & 0 deletions test/stream-write.test.js
Expand Up @@ -24,6 +24,7 @@ test('write: slowput', function(t) {
t.equal(true, slow.stopped, 'dst source stopped');
t.end();
});
put.resume();
});

test('write: unpipe', function(t) {
Expand All @@ -39,6 +40,7 @@ test('write: unpipe', function(t) {
t.end();
}, 100);
}, 20);
put.resume();
});

test('write: emptymax', function(t) {
Expand All @@ -50,4 +52,5 @@ test('write: emptymax', function(t) {
put.on('stop', function() {
t.end();
});
put.resume();
});

0 comments on commit 42d3815

Please sign in to comment.