Skip to content

Commit

Permalink
Further improvements:
Browse files Browse the repository at this point in the history
 - Created a method `_getStreamableSocket` that extends the pause/resume behaviour of a socket to workaround Nodejs 0.6 bugs
 - Tests now use `concatStream` form Ftp, not their own. That uncovered a potential bug.
  • Loading branch information
sergi committed Nov 20, 2012
1 parent 7b84577 commit 9782f1e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 90 deletions.
116 changes: 64 additions & 52 deletions lib/jsftp.js
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,19 @@ Ftp.getPasvPort = function(text) {
}, callback);
};

/**
* Returns a socket for a get (RETR) on a path. The socket is ready to be
* streamed, but it is returned in a paused state. Itis left to the user to
* resume it.
*
* @param path {String} Path to the file to be retrieved
* @param callback {Function} Function to call when finalized, with the socket as a parameter
*/
this.getGetSocket = function(path, callback) {
var self = this;
this.getPasvSocket(function(err, socket) {
if (err) return callback(err);

socket.pause();
self._enqueueCmd("retr " + path, function(err, res) {
callback(err, socket);
Expand Down Expand Up @@ -565,6 +573,53 @@ Ftp.getPasvPort = function(text) {
});
};

/**
* Unfortunately node 0.6 has some serious problems with streams, namely that
* we can't count on socket.pause() actually pausing the stream, so we expand
* its behavior to buffer any data that is transmitted between pausing and
* resuming.
*
* @param port {Number} Socket port
* @param host {String} Socket host
* @returns {Net.socket}
*/
this._getStreamableSocket = function(port, host) {
var socket = Net.createConnection(port, host);
var buffer = [];
var pause = socket.pause;
var pauseCount = 0;

socket.pause = function() {
if (++pauseCount !== 1) return;

pause.call(this);
socket.on("data", onData);
socket.on("end", onEnd);
socket.on("close", onClose);
socket.on("error", onError);
};
function onData(chunk) { buffer.push(["data", chunk]); }
function onEnd() { buffer.push(["end"]); }
function onClose(hadError) { buffer.push(["close", hadError]); }
function onError(err) { buffer.push(["error", err]); }

var resume = socket.resume;
socket.resume = function() {
if (pauseCount === 0) return;
if (--pauseCount !== 0) return;

this.removeListener("data", onData);
this.removeListener("end", onEnd);
this.removeListener("close", onClose);
this.removeListener("error", onError);

buffer.forEach(socket.emit.bind(socket));
resume.call(this);
};

return socket;
};

this.getPasvSocket = function(callback) {
var self = this;
var doPasv = function(err, res) {
Expand All @@ -581,53 +636,10 @@ Ftp.getPasvPort = function(text) {
if (port === false)
return callback(new Error("PASV: Bad port"));

var socket = Net.createConnection(port, self.host);

var buffer = [];
var pause = socket.pause;
var pauseCount = 0;
socket.pause = function() {
if (++pauseCount !== 1)
return;
pause.call(this);
socket.on("data", onData);
socket.on("end", onEnd);
socket.on("close", onClose);
socket.on("error", onError);
}
function onData(chunk) {
buffer.push(["data", chunk]);
}
function onEnd() {
buffer.push(["end"]);
}
function onClose(hadError) {
buffer.push(["close", hadError]);
}
function onError(err) {
buffer.push(["error", err]);
}
var resume = socket.resume;
socket.resume = function() {
if (pauseCount == 0)
return;
if (--pauseCount !== 0)
return;
socket.removeListener("data", onData);
socket.removeListener("end", onEnd);
socket.removeListener("close", onClose);
socket.removeListener("error", onError);
buffer.forEach(function (event) {
socket.emit.apply(socket, event);
});
resume.call(this);
}

var socket = self._getStreamableSocket(port, self.host);
socket.setTimeout(self.timeout || TIMEOUT);
// Send the passive socket to the callback.
callback(null, socket);
};

self._enqueueCmd("pasv", doPasv);
};

Expand Down Expand Up @@ -665,7 +677,7 @@ Ftp.getPasvPort = function(text) {
// next passive call, if any.
socket.on("close", nextPasv);
if (options.concat)
concatStream(err, socket, callback);
Ftp._concatStream(err, socket, callback);
else
callback(err, socket);

Expand Down Expand Up @@ -751,7 +763,6 @@ Ftp.getPasvPort = function(text) {
var self = this;
self.raw.rnfr(from, function(err, res) {
if (err) return callback(err);

self.raw.rnto(to, function(err, res) { callback(err, res); });
});
};
Expand All @@ -765,7 +776,7 @@ Ftp.getPasvPort = function(text) {
};
}).call(Ftp.prototype);

function concat(bufs) {
Ftp._concat = function(bufs) {
var buffer, length = 0, index = 0;

if (!Array.isArray(bufs))
Expand All @@ -784,9 +795,9 @@ function concat(bufs) {
});

return buffer;
}
};

function concatStream(err, socket, callback) {
Ftp._concatStream = function(err, socket, callback) {
if (err) return callback(err);

var pieces = [];
Expand All @@ -795,6 +806,7 @@ function concatStream(err, socket, callback) {
if (hadError)
return callback(new Error("Socket connection error"));

callback(null, concat(pieces));
callback(null, Ftp._concat(pieces));
});
}
socket.resume();
};
41 changes: 3 additions & 38 deletions test/jsftp_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* @author Sergi Mansilla <sergi.mansilla@gmail.com>
* @license https://github.com/sergi/jsFTP/blob/master/LICENSE MIT License
*/
/*global it describe beforeEach afterEach */

"use strict";

Expand All @@ -12,7 +13,6 @@ var Fs = require("fs");
var exec = require('child_process').spawn;
var Ftp = require("../");
var Path = require("path");
var util = require('util');

// Write down your system credentials. This test suite will use OSX internal
// FTP server. If you want to test against a remote server, simply change the
Expand Down Expand Up @@ -426,7 +426,7 @@ describe("jsftp test suite", function() {
var originalData = Fs.readFileSync(path);
ftp.getGetSocket(Path.join(remoteCWD, "testfile.txt"), function(err, readable) {
assert.ok(!err);
concatStream(err, readable, function(err, buffer) {
Ftp._concatStream(err, readable, function(err, buffer) {
assert.ok(!err);
assert.equal(buffer.toString(), originalData.toString());
next();
Expand All @@ -451,7 +451,7 @@ describe("jsftp test suite", function() {
assert.ok(!err);
originalData.pipe(socket);
originalData.resume();
concatStream(err, originalData, function(err, buffer) {
Ftp._concatStream(err, originalData, function(err, buffer) {
assert.ok(!err);
Fs.readFile(path, "utf8", function(err, original) {
assert.equal(buffer.toString("utf8"), original);
Expand All @@ -468,38 +468,3 @@ describe("jsftp test suite", function() {
});
});
});

function concatStream(err, socket, callback) {
if (err) return callback(err);

var pieces = [];
socket.on("data", function(p) { pieces.push(p); });
socket.on("close", function(hadError) {
if (hadError)
return callback(new Error("Socket connection error"));

callback(null, concat(pieces));
});
socket.resume();
}

function concat(bufs) {
var buffer, length = 0, index = 0;

if (!Array.isArray(bufs))
bufs = Array.prototype.slice.call(arguments);

for (var i=0, l=bufs.length; i<l; i++) {
buffer = bufs[i];
length += buffer.length;
}

buffer = new Buffer(length);

bufs.forEach(function(buf, i) {
buf.copy(buffer, index, 0, buf.length);
index += buf.length;
});

return buffer;
}

0 comments on commit 9782f1e

Please sign in to comment.