Skip to content
Browse files

Merge pull request #31 from ajaxorg/master

Implement proper streaming for passive commands and add tests
  • Loading branch information...
2 parents 7d15828 + f44ae03 commit d13451b8e352474f7b8b095985fffab3244217e9 Sergi Mansilla committed
Showing with 290 additions and 124 deletions.
  1. +184 −104 lib/jsftp.js
  2. +1 −1 package.json
  3. +105 −19 test/jsftp_test.js
View
288 lib/jsftp.js
@@ -60,7 +60,7 @@ var Ftp = module.exports = function(cfg) {
if (DEBUG_MODE) {
var self = this;
["command", "response", "connect", "reconnect", "disconnect"].forEach(function(event) {
- self.emitter.on(event, self.log);
+ self.emitter.on(event, console.log);
});
}
@@ -190,11 +190,12 @@ Ftp.getPasvPort = function(text) {
completeCmd += " " + args.join(" ");
}
- self._enqueueCmd(completeCmd.trim(), callback || function(){});
+ self._enqueueCmd(completeCmd.trim(), callback);
};
};
this._enqueueCmd = function(action, callback) {
+ if (!callback) callback = function(){};
if (this.socket && this.socket.writable) {
this._authAndEnqueue(action, callback);
}
@@ -239,7 +240,7 @@ Ftp.getPasvPort = function(text) {
this.socket = Net.createConnection(port, host);
var self = this;
- this.socket.setTimeout(TIMEOUT, function() {
+ this.socket.setTimeout(this.timeout || TIMEOUT, function() {
if (self.onTimeout)
self.onTimeout(new Error("FTP socket timeout"));
@@ -320,7 +321,6 @@ Ftp.getPasvPort = function(text) {
}
buffer.push(line);
}
-
}, this);
}, stop);
};
@@ -468,7 +468,9 @@ Ftp.getPasvPort = function(text) {
if (!err && res.code === 215)
self.system = res.text.toLowerCase();
});
- notifyAll(null, res);
+ self.raw.type("I", function() {
+ notifyAll(null, res);
+ });
}
else if (!err && res.code === 332) {
self.raw.acct(""); // ACCT not really supported
@@ -487,67 +489,6 @@ Ftp.getPasvPort = function(text) {
};
/**
- * Tells the server to enter passive mode, in which the server returns
- * a data port where we can listen to passive data. The callback is called
- * when the passive socket closes its connection.
- *
- * @param callback {function}: Function to execute when the data socket closes (either by success or by error).
- */
- this.getPassiveSocket = function(callback) {
- var self = this;
-
- // `_getPassive` retrieves a passive connection and sends its socket to
- // the callback.
- var _getPassive = function _getPassive(callback) {
- var doPasv = function(err, res) {
- if (err || !res || res.code !== 227) {
- if (res && res.text)
- return callback(new Error(res.text));
- else if (err)
- return callback(err);
- else
- return callback(new Error("Unknown error when trying to get into PASV mode"));
- }
-
- // Executes the next passive call, if there are any.
- var nextPasv = function nextPasv(err) {
- self.currentPasv = null;
- if (self.pasvCallBuffer.length) {
- self.currentPasv = self.pasvCallBuffer.shift();
- self.currentPasv(callback);
- }
- };
-
- var port = Ftp.getPasvPort(res.text);
- if (port === false) return callback(new Error("PASV: Bad port"));
-
- var socket = Net.createConnection(port, self.host);
- // On each one of the events below we want to move on to the
- // next passive call, if any.
- socket.on("close", nextPasv);
-
- // Send the passive socket to the callback.
- callback(null, socket);
- };
-
- self.raw.type("I", function() { self.enqueue(["pasv", doPasv]); });
- };
-
- // If there is a passive call happening, we put the requested passive
- // call in the passive call buffer, to be executed later.
- var fn = function() { _getPassive(callback); };
- if (this.currentPasv) {
- this.pasvCallBuffer.push(fn);
- }
- // otherwise, execute right away because there is no passive calls
- // occuring right now.
- else {
- this.currentPasv = fn;
- this.currentPasv(callback);
- }
- };
-
- /**
* Lists a folder's contents using a passive connection.
*
* @param filePath {String} Remote file/folder path
@@ -558,69 +499,204 @@ Ftp.getPasvPort = function(text) {
filePath = "";
}
- var self = this;
- this.getPassiveSocket(function(err, socket) {
- if (err) return callback(err);
-
- concatStream(null, socket, callback);
- self.enqueue(["list " + filePath]);
- });
+ this.orderedPasv({
+ cmd: "list " + filePath,
+ concat: true
+ }, callback);
};
this.get = function(filePath, callback) {
- var self = this;
- this.getPassiveSocket(function(err, socket) {
- if (err) return callback(err);
-
- concatStream(null, socket, callback);
- self.enqueue(["retr " + filePath]);
- });
+ this.orderedPasv({
+ cmd: "retr " + filePath,
+ concat: true
+ }, callback);
};
+ /**
+ * Returns a socket for a get (RETR) on a path. The socket is ready to be
+ * streamed, but it is returned in a paused state. Itis left to the user to
+ * resume it.
+ *
+ * @param path {String} Path to the file to be retrieved
+ * @param callback {Function} Function to call when finalized, with the socket as a parameter
+ */
this.getGetSocket = function(path, callback) {
var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.getPasvSocket(function(err, socket) {
if (err) return callback(err);
- // Pause the socket to avoid data streaming before there are any
- // listeners to it. We'll let the API consumer resume it.
- if (socket.pause)
- socket.pause();
- callback(null, socket);
- self.enqueue(["retr " + path]);
+ socket.pause();
+ self._enqueueCmd("retr " + path, function(err, res) {
+ callback(err, socket);
+ });
});
};
- this.put = function(filepath, buffer, callback) {
- var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.put = function(filePath, buffer, callback) {
+ this.orderedPasv({ cmd: "stor " + filePath }, function(err, socket) {
if (err) return callback(err);
- self.enqueue(["stor " + filepath]);
setTimeout(function() {
if (socket && socket.writable) {
socket.end(buffer);
callback(null, buffer);
}
else {
- console.log("ftp error: couldn't retrieve pasv connection for command 'stor " + filepath + "'.");
+ console.log("ftp error: couldn't retrieve connection for 'stor " + filePath + "'.");
}
}, 100);
});
};
- this.getPutSocket = function(filepath, callback) {
+ this.getPutSocket = function(path, callback) {
var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.getPasvSocket(function(err, socket) {
if (err) return callback(err);
- self.enqueue(["stor " + filepath]);
- setTimeout(function() {
- callback(null, socket);
+
+ var hadErr;
+ self._enqueueCmd("stor " + path, function(err, res) {
+ if (err) hadErr = err;
+ });
+
+ // This is not a great solution. A STOR command only gives back an error
+ // or a Mark. Marks are not very reliable, which is why jsftp doesn't
+ // take them into account. But then if STOR is successful we never
+ // know because we don't receive any response.
+ // Still, the socket is just open and we can start to pump, so we set
+ // a timeout and after that we pass the socket to the callback along
+ // with any errors that STOR gave back.
+ // A way to make this more reliable would be to look at the response
+ // mark (Should be 150) and only then if it is ok send the socket back.
+ setTimeout(function(err, res) {
+ callback(hadErr, socket);
}, 100);
});
};
/**
+ * Unfortunately node 0.6 has some serious problems with streams, namely that
+ * we can't count on socket.pause() actually pausing the stream, so we expand
+ * its behavior to buffer any data that is transmitted between pausing and
+ * resuming.
+ *
+ * @param port {Number} Socket port
+ * @param host {String} Socket host
+ * @returns {Net.socket}
+ */
+ this._getStreamableSocket = function(port, host) {
+ var socket = Net.createConnection(port, host);
+ var buffer = [];
+ var pause = socket.pause;
+ var pauseCount = 0;
+
+ socket.pause = function() {
+ if (++pauseCount !== 1) return;
+
+ pause.call(this);
+ socket.on("data", onData);
+ socket.on("end", onEnd);
+ socket.on("close", onClose);
+ socket.on("error", onError);
+ };
+ function onData(chunk) { buffer.push(["data", chunk]); }
+ function onEnd() { buffer.push(["end"]); }
+ function onClose(hadError) { buffer.push(["close", hadError]); }
+ function onError(err) { buffer.push(["error", err]); }
+
+ var resume = socket.resume;
+ socket.resume = function() {
+ if (pauseCount === 0) return;
+ if (--pauseCount !== 0) return;
+
+ this.removeListener("data", onData);
+ this.removeListener("end", onEnd);
+ this.removeListener("close", onClose);
+ this.removeListener("error", onError);
+
+ buffer.forEach(socket.emit.bind(socket));
+ resume.call(this);
+ };
+
+ return socket;
+ };
+
+ this.getPasvSocket = function(callback) {
+ var self = this;
+ var doPasv = function(err, res) {
+ if (err || !res || res.code !== 227) {
+ if (res && res.text)
+ return callback(new Error(res.text));
+ else if (err)
+ return callback(err);
+ else
+ return callback(new Error("Unknown error when trying to get into PASV mode"));
+ }
+
+ var port = Ftp.getPasvPort(res.text);
+ if (port === false)
+ return callback(new Error("PASV: Bad port"));
+
+ var socket = self._getStreamableSocket(port, self.host);
+ socket.setTimeout(self.timeout || TIMEOUT);
+ callback(null, socket);
+ };
+ self._enqueueCmd("pasv", doPasv);
+ };
+
+ /**
+ * Sequential passive mode. When this function is used on a command, it will
+ * wait until the current command is executed, and then it will proceed to
+ * execute the next command in the queue.
+ *
+ * The options object contains two options:
+ * - options.cmd: is an array containing the command string e.g. ["stor " + filePath]
+ * - options.concat: Whether the result of the operation should be concatenated
+ * and delivered to the allback when finished.
+ *
+ * @param options {Object}: Contains the options for this function
+ * @param callback {Function}: Function to execute when the data socket closes (either by success or by error).
+ */
+ this.orderedPasv = function(options, callback) {
+ var self = this;
+ // If there is a passive call happening, we put the requested passive
+ // call in the passive call buffer, to be executed later.
+ var fn = function() {
+ this.getPasvSocket(function(err, socket) {
+ if (err) return callback(err);
+
+ // Executes the next passive call, if there are any.
+ var nextPasv = function nextPasv(err) {
+ self.currentPasv = null;
+ if (self.pasvCallBuffer.length) {
+ self.currentPasv = self.pasvCallBuffer.shift();
+ self.currentPasv(callback);
+ }
+ };
+
+ // On each one of the events below we want to move on to the
+ // next passive call, if any.
+ socket.on("close", nextPasv);
+ if (options.concat)
+ Ftp._concatStream(err, socket, callback);
+ else
+ callback(err, socket);
+
+ self._enqueueCmd(options.cmd);
+ });
+ };
+
+ if (this.currentPasv) {
+ this.pasvCallBuffer.push(fn);
+ }
+ // otherwise, execute right away because there are no passive calls
+ // happening right now.
+ else {
+ this.currentPasv = fn;
+ this.currentPasv();
+ }
+ },
+
+ /**
* Provides information about files. It lists a directory contents or
* a single file and yields an array of file objects. The file objects
* contain several properties. The main difference between this method and
@@ -687,7 +763,6 @@ Ftp.getPasvPort = function(text) {
var self = this;
self.raw.rnfr(from, function(err, res) {
if (err) return callback(err);
-
self.raw.rnto(to, function(err, res) { callback(err, res); });
});
};
@@ -701,7 +776,7 @@ Ftp.getPasvPort = function(text) {
};
}).call(Ftp.prototype);
-function concat(bufs) {
+Ftp._concat = function(bufs) {
var buffer, length = 0, index = 0;
if (!Array.isArray(bufs))
@@ -720,13 +795,18 @@ function concat(bufs) {
});
return buffer;
-}
+};
-function concatStream(err, socket, callback) {
+Ftp._concatStream = function(err, socket, callback) {
if (err) return callback(err);
var pieces = [];
socket.on("data", function(p) { pieces.push(p); });
- socket.on("end", function() { callback(null, concat(pieces)); });
- socket.on("error", function(e) { callback(e); });
-}
+ socket.on("close", function(hadError) {
+ if (hadError)
+ return callback(new Error("Socket connection error"));
+
+ callback(null, Ftp._concat(pieces));
+ });
+ socket.resume();
+};
View
2 package.json
@@ -1,7 +1,7 @@
{
"name": "jsftp",
"id": "jsftp",
- "version": "0.4.9",
+ "version": "0.5.0",
"description": "A sane FTP client implementation for NodeJS",
"keywords": [ "ftp", "protocol", "files", "server", "client", "async" ],
"author": "Sergi Mansilla <sergi.mansilla@gmail.com> (http://sergimansilla.com)",
View
124 test/jsftp_test.js
@@ -4,6 +4,7 @@
* @author Sergi Mansilla <sergi.mansilla@gmail.com>
* @license https://github.com/sergi/jsFTP/blob/master/LICENSE MIT License
*/
+ /*global it describe beforeEach afterEach */
"use strict";
@@ -20,7 +21,7 @@ var FTPCredentials = {
host: "localhost",
user: "user",
port: 3334,
- pass: "12345"
+ pass: "12345",
};
var CWD = process.cwd() + "/test";
@@ -44,7 +45,6 @@ describe("jsftp test suite", function() {
}
}
- var self = this;
setTimeout(function() {
ftp = new Ftp(FTPCredentials);
next();
@@ -54,13 +54,13 @@ describe("jsftp test suite", function() {
afterEach(function(next) {
if (daemon)
daemon.kill();
-
- var self = this;
setTimeout(function() {
- ftp.destroy();
- ftp = null;
+ if (ftp) {
+ ftp.destroy();
+ ftp = null;
+ }
next();
- }, 200);
+ }, 100);
}),
it("test features command", function(next) {
@@ -87,7 +87,7 @@ describe("jsftp test suite", function() {
it("test print working directory", function(next) {
ftp.raw.pwd(function(err, res) {
- if (err) throw err;
+ assert(!err);
var code = parseInt(res.code, 10);
assert.ok(code === 257, "PWD command was not successful");
@@ -149,8 +149,6 @@ describe("jsftp test suite", function() {
});
it("test create and delete a directory", function(next) {
- var self = this;
-
var newDir = remoteCWD + "/ftp_test_dir";
ftp.raw.mkd(newDir, function(err, res) {
assert.ok(!err);
@@ -164,7 +162,6 @@ describe("jsftp test suite", function() {
});
it("test create and delete a directory containing a space", function(next) {
- var self = this;
var newDir = remoteCWD + "/ftp test dür";
ftp.raw.mkd(newDir, function(err, res) {
assert.ok(!err);
@@ -178,7 +175,6 @@ describe("jsftp test suite", function() {
});
it("test create and delete a file", function(next) {
- var self = this;
var filePath = remoteCWD + "/file_ftp_test.txt";
Fs.readFile(CWD + "/jsftp_test.js", "binary", function(err, data) {
var buffer = new Buffer(data, "binary");
@@ -200,7 +196,6 @@ describe("jsftp test suite", function() {
});
it("test rename a file", function(next) {
- var self = this;
var from = remoteCWD + "/file_ftp_test.txt";
var to = remoteCWD + "/file_ftp_test_renamed.txt";
Fs.readFile(CWD + "/jsftp_test.js", "binary", function(err, data) {
@@ -266,14 +261,13 @@ describe("jsftp test suite", function() {
next();
});
- function handler() { counter++; };
+ function handler() { counter++; }
});
it("test get fileList array", function(next) {
var file1 = "testfile.txt";
ftp.raw.pwd(function(err, res) {
- var parent, pathDir, path;
var path = remoteCWD + "/" + file1;
ftp.put(path, new Buffer("test"), function(err, res) {
@@ -359,7 +353,6 @@ describe("jsftp test suite", function() {
ftp.socket.end();
ftp.raw.quit(function(err, res) {
if (err) throw err;
-
next();
});
});
@@ -376,9 +369,102 @@ describe("jsftp test suite", function() {
_next();
};
- FTPCredentials.onConnect = clientOnConnect;
+ var client = new Ftp({
+ host: "localhost",
+ user: "user",
+ port: 3334,
+ pass: "12345",
+ onConnect: clientOnConnect
+ });
+ });
+
+ it("test PASV streaming: Copy file using piping", function(next) {
+ var filePath = Path.join(remoteCWD, "testfile.txt");
+ var originalData = Fs.readFileSync(Path.join(CWD, "test_c9", "testfile.txt"));
+ ftp.getGetSocket(filePath, function(err, readable) {
+ assert(!err, err);
+ assert.ok(readable);
- var client = new Ftp(FTPCredentials);
+ readable.on("error", error);
+
+ function error(err) {
+ assert.ok(!err);
+
+ if (readable.destroy)
+ readable.destroy();
+
+ next();
+ }
+
+ var remoteCopy = filePath + ".bak";
+ ftp.getPutSocket(remoteCopy, function(err, socket) {
+ assert.ok(!err);
+ readable.pipe(socket);
+
+ readable.on("close", finish);
+ readable.on("error", error);
+ readable.resume();
+
+ function finish(hadError) {
+ assert.ok(!hadError);
+ ftp.get(remoteCopy, function(err, data) {
+ assert.ok(!err, err);
+ assert.equal(originalData.toString("utf8").length, data.toString("utf8").length);
+
+ ftp.raw.dele(remoteCopy, function(err, data) {
+ assert.ok(!err);
+ next();
+ });
+ });
+ }
+ });
+ });
+ });
+
+ it("Test that streaming GET (RETR) retrieves a file properly", function(next) {
+ var path = Path.join(CWD, "test_c9", "testfile.txt");
+ var originalData = Fs.readFileSync(path);
+ ftp.getGetSocket(Path.join(remoteCWD, "testfile.txt"), function(err, readable) {
+ assert.ok(!err);
+ Ftp._concatStream(err, readable, function(err, buffer) {
+ assert.ok(!err);
+ assert.equal(buffer.toString(), originalData.toString());
+ next();
+ });
+ });
+ });
+
+ it("Test that streaming GET (RETR) fails when a file is not present", function(next) {
+ ftp.getGetSocket("unexisting/file/path", function(err, readable) {
+ assert.ok(err);
+ assert.equal(550, err.code);
+ next();
+ });
});
-});
+ it("Test that streaming PUT (STOR) stores a file properly", function(next) {
+ var path = Path.join(CWD, "test_c9", "testfile.txt");
+ var originalData = Fs.createReadStream(Path.join(CWD, "test_c9", "testfile.txt"));
+ originalData.pause();
+
+ ftp.getPutSocket(Path.join(remoteCWD, "testfile.txt.bak"), function(err, socket) {
+ assert.ok(!err);
+ originalData.pipe(socket);
+ originalData.resume();
+ Ftp._concatStream(err, originalData, function(err, buffer) {
+ assert.ok(!err);
+ Fs.readFile(path, "utf8", function(err, original) {
+ assert.equal(buffer.toString("utf8"), original);
+ next();
+ });
+ });
+ });
+ });
+
+ it("Test that streaming PUT (STOR) fails when a file is not present", function(next) {
+ ftp.getPutSocket("unexisting/file/path", function(err, socket) {
+ assert.ok(err);
+ next();
+ });
+ });
+});

0 comments on commit d13451b

Please sign in to comment.
Something went wrong with that request. Please try again.