Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Normal commands and passive commands are enqueued now.

  • Loading branch information...
commit 47816c390300caa1d7d50e25a1a20d1aeb0e4aaa 1 parent 62da72c
Sergi Mansilla authored
Showing with 97 additions and 59 deletions.
  1. +91 −51 jsftp.js
  2. +6 −8 lib/ftpPasv.js
142 jsftp.js
View
@@ -32,28 +32,35 @@ var COMMANDS = [
"SYST", "CHMOD", "SIZE"
];
-// Simplified version of https://github.com/Gozala/streamer/blob/queue/queue.js
function queue() {
- var next, buffer = Array.prototype.slice.call(arguments)
- function stream($, stop) { next = $; stream._update() }
- stream._update = function _update() {
- buffer.push.apply(buffer, arguments)
- if (next && buffer.length) {
- if (false !== next(buffer.shift())) _update()
+ var next;
+ var buffer = Array.prototype.slice.call(arguments);
+
+ function stream($, stop) {
+ next = $;
+ stream._update();
}
- }
- return stream
+
+ stream._update = function _update() {
+ buffer.push.apply(buffer, arguments);
+ if (next && buffer.length) {
+ if (false !== next(buffer.shift()))
+ _update();
+ else
+ next = null;
+ }
+ };
+ return stream;
}
function enqueue(stream, element) {
- stream._update.apply(null, Array.prototype.slice.call(arguments, 1))
+ stream._update.apply(null, Array.prototype.slice.call(arguments, 1));
}
-
-// This array contains the codes for special commands, such as RETR or STOR,
-// which send two responses instead of one. Not a multiline response, but
-// literally two.
-var SPECIAL_CMDS = [150, 125];
+// Codes from 100 to 200 are FTP marks
+function isMark(code) {
+ return code > 100 && code < 200;
+};
var Ftp = module.exports = function(cfg) {
this.options = cfg;
@@ -88,8 +95,11 @@ var Ftp = module.exports = function(cfg) {
action += " " + args.join(" ");
}
self.keepAlive();
- self.push(action, callback);
- //if (action.indexOf("stat") > -1)
+
+ enqueue(self.cmdQueue, {
+ cmd: action,
+ callback: callback
+ });
};
});
@@ -106,12 +116,14 @@ var Ftp = module.exports = function(cfg) {
* command into `cmds` list. This command will get paired with its response
* once that one is received
*/
- this.push = function(command, callback) {
+ this.push = function(cmdObj) {
var self = this;
-
+ var command = cmdObj.cmd;
+ var callback = cmdObj.callback;
function send() {
cmd([command, callback]);
+ console.log("WRITING", command)
socket.write(command + "\r\n");
}
@@ -127,7 +139,7 @@ var Ftp = module.exports = function(cfg) {
send();
}
else {
- console.log("FTP socket is not writable, reopening socket...")
+ console.log("FTP socket is not writable, reopening socket...");
if (!this.connecting) {
this.connecting = true;
@@ -160,17 +172,23 @@ var Ftp = module.exports = function(cfg) {
socket.on("close", stop);
};
- var cmds, tasks, pasvQueue, pasvReqs, pasv;
+ var cmds, tasks;
var createStreams = function() {
- pasvReqs = self.pasvReqs = queue()
-
- !function recur() {
- // Take first element from `pasvReqs` and process it via `processElement` black box that will call
- // `recur` once it's done to process next element from the `pasvReqs` queue.
- S.head(pasvReqs)(function(element) {
- self.processPasv(element, recur)
- })
- }()
+ self.cmdQueue = queue();
+ (self.nextCmd = function nextCmd() {
+ S.head(self.cmdQueue)(function(obj) {
+ self.push(obj, self.nextCmd);
+ });
+ })();
+
+ self.pasvQueue = queue();
+ (self.nextPasv = function nextPasv() {
+ // Take first element from `pasvQueue` and process it via `processElement` black box that will call
+ // `recur` once it's done to process next element from the `pasvQueue` queue.
+ S.head(self.pasvQueue)(function(element) {
+ self.processPasv(element, self.nextPasv);
+ });
+ })();
// Stream of FTP commands from the client.
cmds = function(next, stop) {
@@ -182,7 +200,10 @@ var Ftp = module.exports = function(cfg) {
* a stream that keeps yielding command/response pairs as soon as each pair
* becomes available.
*/
- tasks = S.zip(self.serverResponse(input), S.append(S.list(null), cmds));
+ tasks = S.zip(S.filter(function(x) {
+ return !isMark(x.code);
+ }, self.serverResponse(input)), S.append(S.list(null), cmds));
+
tasks(self.parse.bind(self), function(err) {
console.log("Ftp socket closed its doors to the public.", err || "");
if (err && self.onError)
@@ -194,7 +215,6 @@ var Ftp = module.exports = function(cfg) {
createStreams();
this.cmd = cmd;
- this.pasv = pasv;
this.connected = false;
};
@@ -242,6 +262,7 @@ var Ftp = module.exports = function(cfg) {
return function stream(next, stop) {
source(function(data) {
+ console.log("RECEIVING", data)
var lines = data.replace(RE_NL_END, "").replace(RE_NL, NL).split(NL);
lines.forEach(function(line) {
@@ -250,6 +271,7 @@ var Ftp = module.exports = function(cfg) {
if (simpleRes) {
var code = parseInt(simpleRes[1], 10);
+
if (buffer.length) {
buffer.push(line);
@@ -264,10 +286,15 @@ var Ftp = module.exports = function(cfg) {
// commands (see above), insert a dummy command to pair
// it up properly with its response, and avoid messing
// up the zipped streams.
- if (SPECIAL_CMDS.indexOf(code) > -1)
- self.cmd(null);
- else
+ //if (isMark(code)) {
+ //enqueue(self.cmdQueue, {
+ //cmd: null,
+ //callback: null
+ //});
+ //}
+ //else {
next({ code: code, text: line });
+ //}
}
else {
if (!buffer.length && (multiRes = RE_MULTI.exec(line)))
@@ -316,9 +343,11 @@ var Ftp = module.exports = function(cfg) {
// Since the RFC is not respected by many servers, we are goiong to
// overgeneralize and consider every value above 399 as an error.
var hasFailed = ftpResponse && ftpResponse.code > 399;
- var error = hasFailed ? ftpResponse.text : null;
+ var error = hasFailed && ftpResponse.text;
callback(error, ftpResponse);
}
+
+ this.nextCmd();
};
this._initialize = function(callback) {
@@ -456,29 +485,40 @@ var Ftp = module.exports = function(cfg) {
* @param onConnect {Function} Function to call when the passive socket connects
*/
this.setPassive = function(pasvData) {
- enqueue(this.pasvReqs, pasvData);
+ enqueue(this.pasvQueue, pasvData);
};
this.processPasv = function(pasvData, next) {
var self = this;
var callback = function(err, res) {
- console.log("get here")
- pasvData.callback(err, res);
+ pasvData.callback && pasvData.callback(err, res);
next();
};
- this.raw.pasv(function(err, res) {
- if (err || res.code !== 227)
- return callback(res.text);
+ var doPasv = function doPasv() {
+ self.raw.pasv(function(err, res) {
+ if (err || res.code !== 227)
+ return callback(res.text);
+
+ var match = RE_PASV.exec(res.text);
+ if (!match)
+ return callback("PASV: Bad port "); // bad port
+
+ var port = (parseInt(match[1], 10) & 255) * 256 + (parseInt(match[2], 10) & 255);
+ self.dataConn = new ftpPasv(self.host, port, pasvData.mode, callback, pasvData.onConnect);
+ });
+ };
+
+ if (this.dataConn && this.dataConn.writable) {
+ this.dataConn.on("close", doPasv);
+ this.dataConn.end();
+ }
+ else {
+ doPasv();
+ }
- var match = RE_PASV.exec(res.text);
- if (!match)
- return callback("PASV: Bad port "); // bad port
- var port = (parseInt(match[1], 10) & 255) * 256 + (parseInt(match[2], 10) & 255);
- this.dataConn = new ftpPasv(self.host, port, pasvData.mode, callback, pasvData.onConnect);
- });
};
/**
@@ -587,7 +627,7 @@ var Ftp = module.exports = function(cfg) {
if ((err && data.code === 502) ||
(self.system && self.system.indexOf("hummingbird") > -1)) {
self.list(filePath, function(err, data) {
- entriesToList(err, data)
+ entriesToList(err, data);
});
}
else {
@@ -617,9 +657,9 @@ var Ftp = module.exports = function(cfg) {
if (err)
return callback(err);
- raw.rnto(to, function(err, res) { callback(err, res) });
+ raw.rnto(to, function(err, res) { callback(err, res); });
});
- }
+ };
this.keepAlive = function() {
if (this._keepAliveInterval)
14 lib/ftpPasv.js
View
@@ -11,7 +11,7 @@ try { S = require("streamer"); }
catch (e) { S = require("../support/streamer/core"); }
var ftpPasv = module.exports = function(host, port, mode, callback, onConnect) {
- var data = [];
+ var data;
var socket = this.socket = Net.createConnection(port, host);
socket.setEncoding("utf8");
@@ -32,14 +32,12 @@ var ftpPasv = module.exports = function(host, port, mode, callback, onConnect) {
var self = this;
var requests = function(source) {
source(function(result) {
- data.push(result);
+ if (mode === "I")
+ data = concat([data || [], result]);
+ else
+ data = [data, result].join("\n");
}, function(error) {
- if (error)
- return callback(error);
-
- console.log("CALLBACK", callback)
- callback(null, mode === "I" ? concat(data) : data.join("\n"));
- socket.destroy();
+ callback(error, data);
});
};
Please sign in to comment.
Something went wrong with that request. Please try again.