Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Implement proper streaming for passive commands and add tests #31

Merged
merged 21 commits into from

2 participants

@sergi
Owner

This fix gives proper Nodejs streaming capabilities to FTP. Until now, jsftp relied on buffering, although it had streaming methods ready to be used (getPutSocket and getGetSocket). However, upon some testing with vfs-ftp I found out that piping wouldn't work, and it would never do because of the way that PASV commands were handled at the moment. That is, sequentially.

Passive command handling methods have been now split into two, getPasvSocket and orderedPasv. The latter uses the former, and now the user of the API can choose what API to use (the streaming API is less straightforward). In this way, getPutSocket and getGetSocket can directly use the methods that allow them to do multiple requests and deal with the returned sockets (streams). The implementation is cleaner now.

In case you were wondering why not get rid of the sequential API, it is because the sequential API is useful for when the user wants to ensure that an FTP command will always be executed after another, but still write it in procedural style (not callback-style). That was needed for the way jsDAV calles different fs functions, which necessarily doesn't compose them and behave independently. Check out the test "test multiple concurrent pasvs" as an example.

A bit more (and a bit duplicated) information
This is a big rehaul of how streaming passive requests are handled.

Improved:

  • Before, streams were not properly resumed and paused. Resulting on timeouts in piping streams, for example.
  • Split passive function into two. getPasvSocket and orderedPasv. The former retrieves the passive socket and the latter is for PASV calls that have to be synchronous, like in

    ftp.put("file1", callback1);
    ftp.put("file2", callback2);
    ...
    
  • VFS will be using only streaming functions, of course.
  • The PASV call now goes through the normal flow that includes initialisation, auth, etc. Before a PASV call on connecting to the Ftp would fail because auth didn't happen first.
  • Other small but important changes.

The diff should be pretty self-explanatory and I of course added new unit tests for the changes.

fjakobs and others added some commits
@fjakobs fjakobs don't use submodules
fix unit tests
9f5806c
@sergi Merge upstream 2e4a48c
@sergi Merge upstream 7ee73ef
@sergi Changed code to continuation-style to avoid potential bugs a0e7bdf
@sergi No need for util dependency 1a14059
Zef Hemel Merge pull request #2 from ajaxorg/fix_continuation
Changed code to continuation-style
a754f34
@sergi User-passed 'onConnect' function was ignored 04f03d8
@sergi Merge upstream 6f489cc
@sergi Bump version to 0.4.9 f65832e
Zef Hemel Merge pull request #3 from ajaxorg/fix_onconnect
Fix execution of the custom `onConnect` handler
c3dee03
@sergi Merge upstream 9fe4b67
@sergi Implement proper streaming for passive commands ac50f2a
@sergi If there is no callback passed to enqueueCmd, it uses an empty one 2442168
@sergi Allow the user to configure the timeout for ftp commands aa7329a
@sergi Make one single call to switch the transfer type to binary, instead o…
…f on each passive call.
6dbe9ae
@sergi Fix streaming commands
Now it pauses and resumes properly so no data is possibly lost, and pasv commands also use `_enqueueCmd` instead of `enqueue` so that they go through the traditional auth flow.
dc5d1ac
@sergi Better naming 6ce72ce
@fjakobs fjakobs make sure to buffer events in pause 7b84577
@sergi Further improvements:
 - Created a method `_getStreamableSocket` that extends the pause/resume behaviour of a socket to workaround Nodejs 0.6 bugs
 - Tests now use `concatStream` form Ftp, not their own. That uncovered a potential bug.
9782f1e
@fjakobs fjakobs Merge pull request #4 from ajaxorg/fix_streaming
Implement proper streaming capabilities for passive commands
486b017
Sergi Mansilla Bump jsftp version to 0.5 f44ae03
@sergi sergi merged commit d13451b into sergi:master

1 check passed

Details default The Travis build passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 7, 2012
  1. @fjakobs

    don't use submodules

    fjakobs authored
    fix unit tests
Commits on Nov 11, 2012
  1. Merge upstream

    authored
  2. Merge upstream

    authored
  3. No need for util dependency

    authored
Commits on Nov 13, 2012
  1. Merge pull request #2 from ajaxorg/fix_continuation

    Zef Hemel authored
    Changed code to continuation-style
Commits on Nov 14, 2012
  1. Merge upstream

    authored
Commits on Nov 15, 2012
  1. Bump version to 0.4.9

    authored
  2. Merge pull request #3 from ajaxorg/fix_onconnect

    Zef Hemel authored
    Fix execution of the custom `onConnect` handler
  3. Merge upstream

    authored
Commits on Nov 19, 2012
  1. Fix streaming commands

    authored
    Now it pauses and resumes properly so no data is possibly lost, and pasv commands also use `_enqueueCmd` instead of `enqueue` so that they go through the traditional auth flow.
  2. Better naming

    authored
Commits on Nov 20, 2012
  1. @fjakobs
  2. Further improvements:

    authored
     - Created a method `_getStreamableSocket` that extends the pause/resume behaviour of a socket to workaround Nodejs 0.6 bugs
     - Tests now use `concatStream` form Ftp, not their own. That uncovered a potential bug.
  3. @fjakobs

    Merge pull request #4 from ajaxorg/fix_streaming

    fjakobs authored
    Implement proper streaming capabilities for passive commands
  4. Bump jsftp version to 0.5

    Sergi Mansilla authored
This page is out of date. Refresh to see the latest.
Showing with 290 additions and 124 deletions.
  1. +184 −104 lib/jsftp.js
  2. +1 −1  package.json
  3. +105 −19 test/jsftp_test.js
View
288 lib/jsftp.js
@@ -60,7 +60,7 @@ var Ftp = module.exports = function(cfg) {
if (DEBUG_MODE) {
var self = this;
["command", "response", "connect", "reconnect", "disconnect"].forEach(function(event) {
- self.emitter.on(event, self.log);
+ self.emitter.on(event, console.log);
});
}
@@ -190,11 +190,12 @@ Ftp.getPasvPort = function(text) {
completeCmd += " " + args.join(" ");
}
- self._enqueueCmd(completeCmd.trim(), callback || function(){});
+ self._enqueueCmd(completeCmd.trim(), callback);
};
};
this._enqueueCmd = function(action, callback) {
+ if (!callback) callback = function(){};
if (this.socket && this.socket.writable) {
this._authAndEnqueue(action, callback);
}
@@ -239,7 +240,7 @@ Ftp.getPasvPort = function(text) {
this.socket = Net.createConnection(port, host);
var self = this;
- this.socket.setTimeout(TIMEOUT, function() {
+ this.socket.setTimeout(this.timeout || TIMEOUT, function() {
if (self.onTimeout)
self.onTimeout(new Error("FTP socket timeout"));
@@ -320,7 +321,6 @@ Ftp.getPasvPort = function(text) {
}
buffer.push(line);
}
-
}, this);
}, stop);
};
@@ -468,7 +468,9 @@ Ftp.getPasvPort = function(text) {
if (!err && res.code === 215)
self.system = res.text.toLowerCase();
});
- notifyAll(null, res);
+ self.raw.type("I", function() {
+ notifyAll(null, res);
+ });
}
else if (!err && res.code === 332) {
self.raw.acct(""); // ACCT not really supported
@@ -487,67 +489,6 @@ Ftp.getPasvPort = function(text) {
};
/**
- * Tells the server to enter passive mode, in which the server returns
- * a data port where we can listen to passive data. The callback is called
- * when the passive socket closes its connection.
- *
- * @param callback {function}: Function to execute when the data socket closes (either by success or by error).
- */
- this.getPassiveSocket = function(callback) {
- var self = this;
-
- // `_getPassive` retrieves a passive connection and sends its socket to
- // the callback.
- var _getPassive = function _getPassive(callback) {
- var doPasv = function(err, res) {
- if (err || !res || res.code !== 227) {
- if (res && res.text)
- return callback(new Error(res.text));
- else if (err)
- return callback(err);
- else
- return callback(new Error("Unknown error when trying to get into PASV mode"));
- }
-
- // Executes the next passive call, if there are any.
- var nextPasv = function nextPasv(err) {
- self.currentPasv = null;
- if (self.pasvCallBuffer.length) {
- self.currentPasv = self.pasvCallBuffer.shift();
- self.currentPasv(callback);
- }
- };
-
- var port = Ftp.getPasvPort(res.text);
- if (port === false) return callback(new Error("PASV: Bad port"));
-
- var socket = Net.createConnection(port, self.host);
- // On each one of the events below we want to move on to the
- // next passive call, if any.
- socket.on("close", nextPasv);
-
- // Send the passive socket to the callback.
- callback(null, socket);
- };
-
- self.raw.type("I", function() { self.enqueue(["pasv", doPasv]); });
- };
-
- // If there is a passive call happening, we put the requested passive
- // call in the passive call buffer, to be executed later.
- var fn = function() { _getPassive(callback); };
- if (this.currentPasv) {
- this.pasvCallBuffer.push(fn);
- }
- // otherwise, execute right away because there is no passive calls
- // occuring right now.
- else {
- this.currentPasv = fn;
- this.currentPasv(callback);
- }
- };
-
- /**
* Lists a folder's contents using a passive connection.
*
* @param filePath {String} Remote file/folder path
@@ -558,69 +499,204 @@ Ftp.getPasvPort = function(text) {
filePath = "";
}
- var self = this;
- this.getPassiveSocket(function(err, socket) {
- if (err) return callback(err);
-
- concatStream(null, socket, callback);
- self.enqueue(["list " + filePath]);
- });
+ this.orderedPasv({
+ cmd: "list " + filePath,
+ concat: true
+ }, callback);
};
this.get = function(filePath, callback) {
- var self = this;
- this.getPassiveSocket(function(err, socket) {
- if (err) return callback(err);
-
- concatStream(null, socket, callback);
- self.enqueue(["retr " + filePath]);
- });
+ this.orderedPasv({
+ cmd: "retr " + filePath,
+ concat: true
+ }, callback);
};
+ /**
+ * Returns a socket for a get (RETR) on a path. The socket is ready to be
+ * streamed, but it is returned in a paused state. Itis left to the user to
+ * resume it.
+ *
+ * @param path {String} Path to the file to be retrieved
+ * @param callback {Function} Function to call when finalized, with the socket as a parameter
+ */
this.getGetSocket = function(path, callback) {
var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.getPasvSocket(function(err, socket) {
if (err) return callback(err);
- // Pause the socket to avoid data streaming before there are any
- // listeners to it. We'll let the API consumer resume it.
- if (socket.pause)
- socket.pause();
- callback(null, socket);
- self.enqueue(["retr " + path]);
+ socket.pause();
+ self._enqueueCmd("retr " + path, function(err, res) {
+ callback(err, socket);
+ });
});
};
- this.put = function(filepath, buffer, callback) {
- var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.put = function(filePath, buffer, callback) {
+ this.orderedPasv({ cmd: "stor " + filePath }, function(err, socket) {
if (err) return callback(err);
- self.enqueue(["stor " + filepath]);
setTimeout(function() {
if (socket && socket.writable) {
socket.end(buffer);
callback(null, buffer);
}
else {
- console.log("ftp error: couldn't retrieve pasv connection for command 'stor " + filepath + "'.");
+ console.log("ftp error: couldn't retrieve connection for 'stor " + filePath + "'.");
}
}, 100);
});
};
- this.getPutSocket = function(filepath, callback) {
+ this.getPutSocket = function(path, callback) {
var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.getPasvSocket(function(err, socket) {
if (err) return callback(err);
- self.enqueue(["stor " + filepath]);
- setTimeout(function() {
- callback(null, socket);
+
+ var hadErr;
+ self._enqueueCmd("stor " + path, function(err, res) {
+ if (err) hadErr = err;
+ });
+
+ // This is not a great solution. A STOR command only gives back an error
+ // or a Mark. Marks are not very reliable, which is why jsftp doesn't
+ // take them into account. But then if STOR is successful we never
+ // know because we don't receive any response.
+ // Still, the socket is just open and we can start to pump, so we set
+ // a timeout and after that we pass the socket to the callback along
+ // with any errors that STOR gave back.
+ // A way to make this more reliable would be to look at the response
+ // mark (Should be 150) and only then if it is ok send the socket back.
+ setTimeout(function(err, res) {
+ callback(hadErr, socket);
}, 100);
});
};
/**
+ * Unfortunately node 0.6 has some serious problems with streams, namely that
+ * we can't count on socket.pause() actually pausing the stream, so we expand
+ * its behavior to buffer any data that is transmitted between pausing and
+ * resuming.
+ *
+ * @param port {Number} Socket port
+ * @param host {String} Socket host
+ * @returns {Net.socket}
+ */
+ this._getStreamableSocket = function(port, host) {
+ var socket = Net.createConnection(port, host);
+ var buffer = [];
+ var pause = socket.pause;
+ var pauseCount = 0;
+
+ socket.pause = function() {
+ if (++pauseCount !== 1) return;
+
+ pause.call(this);
+ socket.on("data", onData);
+ socket.on("end", onEnd);
+ socket.on("close", onClose);
+ socket.on("error", onError);
+ };
+ function onData(chunk) { buffer.push(["data", chunk]); }
+ function onEnd() { buffer.push(["end"]); }
+ function onClose(hadError) { buffer.push(["close", hadError]); }
+ function onError(err) { buffer.push(["error", err]); }
+
+ var resume = socket.resume;
+ socket.resume = function() {
+ if (pauseCount === 0) return;
+ if (--pauseCount !== 0) return;
+
+ this.removeListener("data", onData);
+ this.removeListener("end", onEnd);
+ this.removeListener("close", onClose);
+ this.removeListener("error", onError);
+
+ buffer.forEach(socket.emit.bind(socket));
+ resume.call(this);
+ };
+
+ return socket;
+ };
+
+ this.getPasvSocket = function(callback) {
+ var self = this;
+ var doPasv = function(err, res) {
+ if (err || !res || res.code !== 227) {
+ if (res && res.text)
+ return callback(new Error(res.text));
+ else if (err)
+ return callback(err);
+ else
+ return callback(new Error("Unknown error when trying to get into PASV mode"));
+ }
+
+ var port = Ftp.getPasvPort(res.text);
+ if (port === false)
+ return callback(new Error("PASV: Bad port"));
+
+ var socket = self._getStreamableSocket(port, self.host);
+ socket.setTimeout(self.timeout || TIMEOUT);
+ callback(null, socket);
+ };
+ self._enqueueCmd("pasv", doPasv);
+ };
+
+ /**
+ * Sequential passive mode. When this function is used on a command, it will
+ * wait until the current command is executed, and then it will proceed to
+ * execute the next command in the queue.
+ *
+ * The options object contains two options:
+ * - options.cmd: is an array containing the command string e.g. ["stor " + filePath]
+ * - options.concat: Whether the result of the operation should be concatenated
+ * and delivered to the allback when finished.
+ *
+ * @param options {Object}: Contains the options for this function
+ * @param callback {Function}: Function to execute when the data socket closes (either by success or by error).
+ */
+ this.orderedPasv = function(options, callback) {
+ var self = this;
+ // If there is a passive call happening, we put the requested passive
+ // call in the passive call buffer, to be executed later.
+ var fn = function() {
+ this.getPasvSocket(function(err, socket) {
+ if (err) return callback(err);
+
+ // Executes the next passive call, if there are any.
+ var nextPasv = function nextPasv(err) {
+ self.currentPasv = null;
+ if (self.pasvCallBuffer.length) {
+ self.currentPasv = self.pasvCallBuffer.shift();
+ self.currentPasv(callback);
+ }
+ };
+
+ // On each one of the events below we want to move on to the
+ // next passive call, if any.
+ socket.on("close", nextPasv);
+ if (options.concat)
+ Ftp._concatStream(err, socket, callback);
+ else
+ callback(err, socket);
+
+ self._enqueueCmd(options.cmd);
+ });
+ };
+
+ if (this.currentPasv) {
+ this.pasvCallBuffer.push(fn);
+ }
+ // otherwise, execute right away because there are no passive calls
+ // happening right now.
+ else {
+ this.currentPasv = fn;
+ this.currentPasv();
+ }
+ },
+
+ /**
* Provides information about files. It lists a directory contents or
* a single file and yields an array of file objects. The file objects
* contain several properties. The main difference between this method and
@@ -687,7 +763,6 @@ Ftp.getPasvPort = function(text) {
var self = this;
self.raw.rnfr(from, function(err, res) {
if (err) return callback(err);
-
self.raw.rnto(to, function(err, res) { callback(err, res); });
});
};
@@ -701,7 +776,7 @@ Ftp.getPasvPort = function(text) {
};
}).call(Ftp.prototype);
-function concat(bufs) {
+Ftp._concat = function(bufs) {
var buffer, length = 0, index = 0;
if (!Array.isArray(bufs))
@@ -720,13 +795,18 @@ function concat(bufs) {
});
return buffer;
-}
+};
-function concatStream(err, socket, callback) {
+Ftp._concatStream = function(err, socket, callback) {
if (err) return callback(err);
var pieces = [];
socket.on("data", function(p) { pieces.push(p); });
- socket.on("end", function() { callback(null, concat(pieces)); });
- socket.on("error", function(e) { callback(e); });
-}
+ socket.on("close", function(hadError) {
+ if (hadError)
+ return callback(new Error("Socket connection error"));
+
+ callback(null, Ftp._concat(pieces));
+ });
+ socket.resume();
+};
View
2  package.json
@@ -1,7 +1,7 @@
{
"name": "jsftp",
"id": "jsftp",
- "version": "0.4.9",
+ "version": "0.5.0",
"description": "A sane FTP client implementation for NodeJS",
"keywords": [ "ftp", "protocol", "files", "server", "client", "async" ],
"author": "Sergi Mansilla <sergi.mansilla@gmail.com> (http://sergimansilla.com)",
View
124 test/jsftp_test.js
@@ -4,6 +4,7 @@
* @author Sergi Mansilla <sergi.mansilla@gmail.com>
* @license https://github.com/sergi/jsFTP/blob/master/LICENSE MIT License
*/
+ /*global it describe beforeEach afterEach */
"use strict";
@@ -20,7 +21,7 @@ var FTPCredentials = {
host: "localhost",
user: "user",
port: 3334,
- pass: "12345"
+ pass: "12345",
};
var CWD = process.cwd() + "/test";
@@ -44,7 +45,6 @@ describe("jsftp test suite", function() {
}
}
- var self = this;
setTimeout(function() {
ftp = new Ftp(FTPCredentials);
next();
@@ -54,13 +54,13 @@ describe("jsftp test suite", function() {
afterEach(function(next) {
if (daemon)
daemon.kill();
-
- var self = this;
setTimeout(function() {
- ftp.destroy();
- ftp = null;
+ if (ftp) {
+ ftp.destroy();
+ ftp = null;
+ }
next();
- }, 200);
+ }, 100);
}),
it("test features command", function(next) {
@@ -87,7 +87,7 @@ describe("jsftp test suite", function() {
it("test print working directory", function(next) {
ftp.raw.pwd(function(err, res) {
- if (err) throw err;
+ assert(!err);
var code = parseInt(res.code, 10);
assert.ok(code === 257, "PWD command was not successful");
@@ -149,8 +149,6 @@ describe("jsftp test suite", function() {
});
it("test create and delete a directory", function(next) {
- var self = this;
-
var newDir = remoteCWD + "/ftp_test_dir";
ftp.raw.mkd(newDir, function(err, res) {
assert.ok(!err);
@@ -164,7 +162,6 @@ describe("jsftp test suite", function() {
});
it("test create and delete a directory containing a space", function(next) {
- var self = this;
var newDir = remoteCWD + "/ftp test dür";
ftp.raw.mkd(newDir, function(err, res) {
assert.ok(!err);
@@ -178,7 +175,6 @@ describe("jsftp test suite", function() {
});
it("test create and delete a file", function(next) {
- var self = this;
var filePath = remoteCWD + "/file_ftp_test.txt";
Fs.readFile(CWD + "/jsftp_test.js", "binary", function(err, data) {
var buffer = new Buffer(data, "binary");
@@ -200,7 +196,6 @@ describe("jsftp test suite", function() {
});
it("test rename a file", function(next) {
- var self = this;
var from = remoteCWD + "/file_ftp_test.txt";
var to = remoteCWD + "/file_ftp_test_renamed.txt";
Fs.readFile(CWD + "/jsftp_test.js", "binary", function(err, data) {
@@ -266,14 +261,13 @@ describe("jsftp test suite", function() {
next();
});
- function handler() { counter++; };
+ function handler() { counter++; }
});
it("test get fileList array", function(next) {
var file1 = "testfile.txt";
ftp.raw.pwd(function(err, res) {
- var parent, pathDir, path;
var path = remoteCWD + "/" + file1;
ftp.put(path, new Buffer("test"), function(err, res) {
@@ -359,7 +353,6 @@ describe("jsftp test suite", function() {
ftp.socket.end();
ftp.raw.quit(function(err, res) {
if (err) throw err;
-
next();
});
});
@@ -376,9 +369,102 @@ describe("jsftp test suite", function() {
_next();
};
- FTPCredentials.onConnect = clientOnConnect;
+ var client = new Ftp({
+ host: "localhost",
+ user: "user",
+ port: 3334,
+ pass: "12345",
+ onConnect: clientOnConnect
+ });
+ });
+
+ it("test PASV streaming: Copy file using piping", function(next) {
+ var filePath = Path.join(remoteCWD, "testfile.txt");
+ var originalData = Fs.readFileSync(Path.join(CWD, "test_c9", "testfile.txt"));
+ ftp.getGetSocket(filePath, function(err, readable) {
+ assert(!err, err);
+ assert.ok(readable);
- var client = new Ftp(FTPCredentials);
+ readable.on("error", error);
+
+ function error(err) {
+ assert.ok(!err);
+
+ if (readable.destroy)
+ readable.destroy();
+
+ next();
+ }
+
+ var remoteCopy = filePath + ".bak";
+ ftp.getPutSocket(remoteCopy, function(err, socket) {
+ assert.ok(!err);
+ readable.pipe(socket);
+
+ readable.on("close", finish);
+ readable.on("error", error);
+ readable.resume();
+
+ function finish(hadError) {
+ assert.ok(!hadError);
+ ftp.get(remoteCopy, function(err, data) {
+ assert.ok(!err, err);
+ assert.equal(originalData.toString("utf8").length, data.toString("utf8").length);
+
+ ftp.raw.dele(remoteCopy, function(err, data) {
+ assert.ok(!err);
+ next();
+ });
+ });
+ }
+ });
+ });
+ });
+
+ it("Test that streaming GET (RETR) retrieves a file properly", function(next) {
+ var path = Path.join(CWD, "test_c9", "testfile.txt");
+ var originalData = Fs.readFileSync(path);
+ ftp.getGetSocket(Path.join(remoteCWD, "testfile.txt"), function(err, readable) {
+ assert.ok(!err);
+ Ftp._concatStream(err, readable, function(err, buffer) {
+ assert.ok(!err);
+ assert.equal(buffer.toString(), originalData.toString());
+ next();
+ });
+ });
+ });
+
+ it("Test that streaming GET (RETR) fails when a file is not present", function(next) {
+ ftp.getGetSocket("unexisting/file/path", function(err, readable) {
+ assert.ok(err);
+ assert.equal(550, err.code);
+ next();
+ });
});
-});
+ it("Test that streaming PUT (STOR) stores a file properly", function(next) {
+ var path = Path.join(CWD, "test_c9", "testfile.txt");
+ var originalData = Fs.createReadStream(Path.join(CWD, "test_c9", "testfile.txt"));
+ originalData.pause();
+
+ ftp.getPutSocket(Path.join(remoteCWD, "testfile.txt.bak"), function(err, socket) {
+ assert.ok(!err);
+ originalData.pipe(socket);
+ originalData.resume();
+ Ftp._concatStream(err, originalData, function(err, buffer) {
+ assert.ok(!err);
+ Fs.readFile(path, "utf8", function(err, original) {
+ assert.equal(buffer.toString("utf8"), original);
+ next();
+ });
+ });
+ });
+ });
+
+ it("Test that streaming PUT (STOR) fails when a file is not present", function(next) {
+ ftp.getPutSocket("unexisting/file/path", function(err, socket) {
+ assert.ok(err);
+ next();
+ });
+ });
+});
Something went wrong with that request. Please try again.