Permalink
Browse files

Merge pull request #4 from ajaxorg/fix_streaming

Implement proper streaming capabilities for passive commands
  • Loading branch information...
2 parents c3dee03 + 9782f1e commit 486b0174f5da91327f3c689f267e6dc9d70dd273 @fjakobs fjakobs committed Nov 20, 2012
Showing with 295 additions and 127 deletions.
  1. +184 −104 lib/jsftp.js
  2. +111 −23 test/jsftp_test.js
View
@@ -60,7 +60,7 @@ var Ftp = module.exports = function(cfg) {
if (DEBUG_MODE) {
var self = this;
["command", "response", "connect", "reconnect", "disconnect"].forEach(function(event) {
- self.emitter.on(event, self.log);
+ self.emitter.on(event, console.log);
});
}
@@ -190,11 +190,12 @@ Ftp.getPasvPort = function(text) {
completeCmd += " " + args.join(" ");
}
- self._enqueueCmd(completeCmd.trim(), callback || function(){});
+ self._enqueueCmd(completeCmd.trim(), callback);
};
};
this._enqueueCmd = function(action, callback) {
+ if (!callback) callback = function(){};
if (this.socket && this.socket.writable) {
this._authAndEnqueue(action, callback);
}
@@ -239,7 +240,7 @@ Ftp.getPasvPort = function(text) {
this.socket = Net.createConnection(port, host);
var self = this;
- this.socket.setTimeout(TIMEOUT, function() {
+ this.socket.setTimeout(this.timeout || TIMEOUT, function() {
if (self.onTimeout)
self.onTimeout(new Error("FTP socket timeout"));
@@ -320,7 +321,6 @@ Ftp.getPasvPort = function(text) {
}
buffer.push(line);
}
-
}, this);
}, stop);
};
@@ -468,7 +468,9 @@ Ftp.getPasvPort = function(text) {
if (!err && res.code === 215)
self.system = res.text.toLowerCase();
});
- notifyAll(null, res);
+ self.raw.type("I", function() {
+ notifyAll(null, res);
+ });
}
else if (!err && res.code === 332) {
self.raw.acct(""); // ACCT not really supported
@@ -487,67 +489,6 @@ Ftp.getPasvPort = function(text) {
};
/**
- * Tells the server to enter passive mode, in which the server returns
- * a data port where we can listen to passive data. The callback is called
- * when the passive socket closes its connection.
- *
- * @param callback {function}: Function to execute when the data socket closes (either by success or by error).
- */
- this.getPassiveSocket = function(callback) {
- var self = this;
-
- // `_getPassive` retrieves a passive connection and sends its socket to
- // the callback.
- var _getPassive = function _getPassive(callback) {
- var doPasv = function(err, res) {
- if (err || !res || res.code !== 227) {
- if (res && res.text)
- return callback(new Error(res.text));
- else if (err)
- return callback(err);
- else
- return callback(new Error("Unknown error when trying to get into PASV mode"));
- }
-
- // Executes the next passive call, if there are any.
- var nextPasv = function nextPasv(err) {
- self.currentPasv = null;
- if (self.pasvCallBuffer.length) {
- self.currentPasv = self.pasvCallBuffer.shift();
- self.currentPasv(callback);
- }
- };
-
- var port = Ftp.getPasvPort(res.text);
- if (port === false) return callback(new Error("PASV: Bad port"));
-
- var socket = Net.createConnection(port, self.host);
- // On each one of the events below we want to move on to the
- // next passive call, if any.
- socket.on("close", nextPasv);
-
- // Send the passive socket to the callback.
- callback(null, socket);
- };
-
- self.raw.type("I", function() { self.enqueue(["pasv", doPasv]); });
- };
-
- // If there is a passive call happening, we put the requested passive
- // call in the passive call buffer, to be executed later.
- var fn = function() { _getPassive(callback); };
- if (this.currentPasv) {
- this.pasvCallBuffer.push(fn);
- }
- // otherwise, execute right away because there is no passive calls
- // occuring right now.
- else {
- this.currentPasv = fn;
- this.currentPasv(callback);
- }
- };
-
- /**
* Lists a folder's contents using a passive connection.
*
* @param filePath {String} Remote file/folder path
@@ -558,69 +499,204 @@ Ftp.getPasvPort = function(text) {
filePath = "";
}
- var self = this;
- this.getPassiveSocket(function(err, socket) {
- if (err) return callback(err);
-
- concatStream(null, socket, callback);
- self.enqueue(["list " + filePath]);
- });
+ this.orderedPasv({
+ cmd: "list " + filePath,
+ concat: true
+ }, callback);
};
this.get = function(filePath, callback) {
- var self = this;
- this.getPassiveSocket(function(err, socket) {
- if (err) return callback(err);
-
- concatStream(null, socket, callback);
- self.enqueue(["retr " + filePath]);
- });
+ this.orderedPasv({
+ cmd: "retr " + filePath,
+ concat: true
+ }, callback);
};
+ /**
+ * Returns a socket for a get (RETR) on a path. The socket is ready to be
+ * streamed, but it is returned in a paused state. Itis left to the user to
+ * resume it.
+ *
+ * @param path {String} Path to the file to be retrieved
+ * @param callback {Function} Function to call when finalized, with the socket as a parameter
+ */
this.getGetSocket = function(path, callback) {
var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.getPasvSocket(function(err, socket) {
if (err) return callback(err);
- // Pause the socket to avoid data streaming before there are any
- // listeners to it. We'll let the API consumer resume it.
- if (socket.pause)
- socket.pause();
- callback(null, socket);
- self.enqueue(["retr " + path]);
+ socket.pause();
+ self._enqueueCmd("retr " + path, function(err, res) {
+ callback(err, socket);
+ });
});
};
- this.put = function(filepath, buffer, callback) {
- var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.put = function(filePath, buffer, callback) {
+ this.orderedPasv({ cmd: "stor " + filePath }, function(err, socket) {
if (err) return callback(err);
- self.enqueue(["stor " + filepath]);
setTimeout(function() {
if (socket && socket.writable) {
socket.end(buffer);
callback(null, buffer);
}
else {
- console.log("ftp error: couldn't retrieve pasv connection for command 'stor " + filepath + "'.");
+ console.log("ftp error: couldn't retrieve connection for 'stor " + filePath + "'.");
}
}, 100);
});
};
- this.getPutSocket = function(filepath, callback) {
+ this.getPutSocket = function(path, callback) {
var self = this;
- this.getPassiveSocket(function(err, socket) {
+ this.getPasvSocket(function(err, socket) {
if (err) return callback(err);
- self.enqueue(["stor " + filepath]);
- setTimeout(function() {
- callback(null, socket);
+
+ var hadErr;
+ self._enqueueCmd("stor " + path, function(err, res) {
+ if (err) hadErr = err;
+ });
+
+ // This is not a great solution. A STOR command only gives back an error
+ // or a Mark. Marks are not very reliable, which is why jsftp doesn't
+ // take them into account. But then if STOR is successful we never
+ // know because we don't receive any response.
+ // Still, the socket is just open and we can start to pump, so we set
+ // a timeout and after that we pass the socket to the callback along
+ // with any errors that STOR gave back.
+ // A way to make this more reliable would be to look at the response
+ // mark (Should be 150) and only then if it is ok send the socket back.
+ setTimeout(function(err, res) {
+ callback(hadErr, socket);
}, 100);
});
};
/**
+ * Unfortunately node 0.6 has some serious problems with streams, namely that
+ * we can't count on socket.pause() actually pausing the stream, so we expand
+ * its behavior to buffer any data that is transmitted between pausing and
+ * resuming.
+ *
+ * @param port {Number} Socket port
+ * @param host {String} Socket host
+ * @returns {Net.socket}
+ */
+ this._getStreamableSocket = function(port, host) {
+ var socket = Net.createConnection(port, host);
+ var buffer = [];
+ var pause = socket.pause;
+ var pauseCount = 0;
+
+ socket.pause = function() {
+ if (++pauseCount !== 1) return;
+
+ pause.call(this);
+ socket.on("data", onData);
+ socket.on("end", onEnd);
+ socket.on("close", onClose);
+ socket.on("error", onError);
+ };
+ function onData(chunk) { buffer.push(["data", chunk]); }
+ function onEnd() { buffer.push(["end"]); }
+ function onClose(hadError) { buffer.push(["close", hadError]); }
+ function onError(err) { buffer.push(["error", err]); }
+
+ var resume = socket.resume;
+ socket.resume = function() {
+ if (pauseCount === 0) return;
+ if (--pauseCount !== 0) return;
+
+ this.removeListener("data", onData);
+ this.removeListener("end", onEnd);
+ this.removeListener("close", onClose);
+ this.removeListener("error", onError);
+
+ buffer.forEach(socket.emit.bind(socket));
+ resume.call(this);
+ };
+
+ return socket;
+ };
+
+ this.getPasvSocket = function(callback) {
+ var self = this;
+ var doPasv = function(err, res) {
+ if (err || !res || res.code !== 227) {
+ if (res && res.text)
+ return callback(new Error(res.text));
+ else if (err)
+ return callback(err);
+ else
+ return callback(new Error("Unknown error when trying to get into PASV mode"));
+ }
+
+ var port = Ftp.getPasvPort(res.text);
+ if (port === false)
+ return callback(new Error("PASV: Bad port"));
+
+ var socket = self._getStreamableSocket(port, self.host);
+ socket.setTimeout(self.timeout || TIMEOUT);
+ callback(null, socket);
+ };
+ self._enqueueCmd("pasv", doPasv);
+ };
+
+ /**
+ * Sequential passive mode. When this function is used on a command, it will
+ * wait until the current command is executed, and then it will proceed to
+ * execute the next command in the queue.
+ *
+ * The options object contains two options:
+ * - options.cmd: is an array containing the command string e.g. ["stor " + filePath]
+ * - options.concat: Whether the result of the operation should be concatenated
+ * and delivered to the allback when finished.
+ *
+ * @param options {Object}: Contains the options for this function
+ * @param callback {Function}: Function to execute when the data socket closes (either by success or by error).
+ */
+ this.orderedPasv = function(options, callback) {
+ var self = this;
+ // If there is a passive call happening, we put the requested passive
+ // call in the passive call buffer, to be executed later.
+ var fn = function() {
+ this.getPasvSocket(function(err, socket) {
+ if (err) return callback(err);
+
+ // Executes the next passive call, if there are any.
+ var nextPasv = function nextPasv(err) {
+ self.currentPasv = null;
+ if (self.pasvCallBuffer.length) {
+ self.currentPasv = self.pasvCallBuffer.shift();
+ self.currentPasv(callback);
+ }
+ };
+
+ // On each one of the events below we want to move on to the
+ // next passive call, if any.
+ socket.on("close", nextPasv);
+ if (options.concat)
+ Ftp._concatStream(err, socket, callback);
+ else
+ callback(err, socket);
+
+ self._enqueueCmd(options.cmd);
+ });
+ };
+
+ if (this.currentPasv) {
+ this.pasvCallBuffer.push(fn);
+ }
+ // otherwise, execute right away because there are no passive calls
+ // happening right now.
+ else {
+ this.currentPasv = fn;
+ this.currentPasv();
+ }
+ },
+
+ /**
* Provides information about files. It lists a directory contents or
* a single file and yields an array of file objects. The file objects
* contain several properties. The main difference between this method and
@@ -687,7 +763,6 @@ Ftp.getPasvPort = function(text) {
var self = this;
self.raw.rnfr(from, function(err, res) {
if (err) return callback(err);
-
self.raw.rnto(to, function(err, res) { callback(err, res); });
});
};
@@ -701,7 +776,7 @@ Ftp.getPasvPort = function(text) {
};
}).call(Ftp.prototype);
-function concat(bufs) {
+Ftp._concat = function(bufs) {
var buffer, length = 0, index = 0;
if (!Array.isArray(bufs))
@@ -720,13 +795,18 @@ function concat(bufs) {
});
return buffer;
-}
+};
-function concatStream(err, socket, callback) {
+Ftp._concatStream = function(err, socket, callback) {
if (err) return callback(err);
var pieces = [];
socket.on("data", function(p) { pieces.push(p); });
- socket.on("end", function() { callback(null, concat(pieces)); });
- socket.on("error", function(e) { callback(e); });
-}
+ socket.on("close", function(hadError) {
+ if (hadError)
+ return callback(new Error("Socket connection error"));
+
+ callback(null, Ftp._concat(pieces));
+ });
+ socket.resume();
+};
Oops, something went wrong.

0 comments on commit 486b017

Please sign in to comment.