Skip to content

Commit

Permalink
refactor and DRY up copy function
Browse files Browse the repository at this point in the history
  • Loading branch information
rclark committed May 29, 2015
1 parent 5d11669 commit b4e84ac
Showing 1 changed file with 21 additions and 35 deletions.
56 changes: 21 additions & 35 deletions lib/tilelive.js
Expand Up @@ -341,6 +341,7 @@ tilelive.copy = function(src, dst, options, callback) {

q.await(function(err) {
if (err) return callback(err);

copy(src, dst, options, function(err) {
if (err) return callback(err);
if (options.close) closingTime(src, dst, function(err) {
Expand All @@ -352,59 +353,44 @@ tilelive.copy = function(src, dst, options, callback) {
});

function copy(src, dst, opts, callback) {
// copy to outStream, if present. This is done after the src is loaded
if (!dst && opts.outStream) return out(opts, callback);
// sinkStream represents a copy to a non-tilelive stream
var sinkStream = !dst && opts.outStream;

// Copy
var get = tilelive.createReadStream(src, opts);
var put = tilelive.createWriteStream(dst, {retry:opts.retry});
var put = sinkStream ?
options.outStream : tilelive.createWriteStream(dst, {retry:opts.retry});

var hasErrored = false;
function handleError(err) {
if (!hasErrored) callback(err);
hasErrored = true;
}

var previous;
setInterval(function() {
if (prog.progress.transferred === previous) {
// kill somehow
}
previous = prog.progress.transferred;
}, 60000).unref();

get.on('error', handleError);
put.on('error', handleError);
get.on('length', prog.setLength);
if (options.progress) prog.on('progress', function(p) { options.progress(get.stats, p); });

if (opts.type === 'list') {
var file = opts.listStream;
file.pipe(get).pipe(prog).pipe(put);
} else {
get.pipe(prog).pipe(put);
}
if (options.progress)
prog.on('progress', function(p) { options.progress(get.stats, p); });

put.on('stop', callback);
}

function out(options, callback) {
var get = tilelive.createReadStream(src, options);
var put = options.outStream;

var hasErrored = false;
function handleError(err) {
if (!hasErrored) callback(err);
hasErrored = true;
}

get.on('error', handleError);
put.on('error', handleError);
get.on('length', prog.setLength);
if (options.progress) prog.on('progress', function(p) { options.progress(get.stats, p); });
var doneEvent = sinkStream ? 'finish' : 'stop';

if (options.outStream === process.stdout ||
options.outStream === process.stderr) prog.on('end', callback);
else put.on('finish', callback);
else
put.on(doneEvent, callback);

if (options.type === 'list') {
var file = options.listStream;
file.pipe(get).pipe(tilelive.serialize()).pipe(prog).pipe(put);
} else {
get.pipe(tilelive.serialize()).pipe(prog).pipe(put);
}
var pipeline = opts.type === 'list' ? opts.listStream.pipe(get) : get;
if (sinkStream) pipeline = pipeline.pipe(tilelive.serialize());
pipeline.pipe(prog).pipe(put);
}

function closingTime(src, dst, callback) {
Expand Down

0 comments on commit b4e84ac

Please sign in to comment.