Skip to content

Commit

Permalink
Merge pull request #35 from bancek/streaming-marks
Browse files Browse the repository at this point in the history
Use marks for streaming
  • Loading branch information
Sergi Mansilla committed Dec 1, 2012
2 parents 5a6a1d5 + 1ce781f commit acf4911
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 41 deletions.
95 changes: 54 additions & 41 deletions lib/jsftp.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Ftp.queue = function() {
var stream = function stream($, stop) {
next = $;
stream._update();
stream.next = $;
};

stream._update = function _update() {
Expand Down Expand Up @@ -133,13 +134,7 @@ Ftp.getPasvPort = function(text) {
// a stream that keeps yielding command/response pairs as soon as each pair
// becomes available.
var pairStreamer = S.zip(
// We ignore FTP marks for now. They don't convey useful
// information. A more elegant solution should be found in the
// future.
S.filter(
function(x) { return !Ftp.isMark(x.code); },
self.serverResponse(input)
),
self.serverResponse(input),
// Stream of FTP commands from the client.
S.append(S.list(null), function(next, stop) { cmd = next; })
);
Expand Down Expand Up @@ -370,23 +365,30 @@ Ftp.getPasvPort = function(text) {
var ftpResponse = action[0];
var command = action[1];
var callback = command[1];
if (callback) {
if (!ftpResponse) {
callback(new Error("FTP response not defined"));
}
// In FTP every response code above 399 means error in some way.
// Since the RFC is not respected by many servers, we are going to
// overgeneralize and consider every value above 399 as an error.
else if (ftpResponse.code > 399) {
var err = new Error(ftpResponse.text || "Unknown FTP error.");
err.code = ftpResponse.code;
callback(err);
}
else {
if (ftpResponse != null && Ftp.isMark(ftpResponse.code)) {
if (callback != null && callback.acceptsMarks) {
callback(null, ftpResponse);
}
this.cmdQueue.next(['', callback]);
} else {
if (callback) {
if (!ftpResponse) {
callback(new Error("FTP response not defined"));
}
// In FTP every response code above 399 means error in some way.
// Since the RFC is not respected by many servers, we are going to
// overgeneralize and consider every value above 399 as an error.
else if (ftpResponse.code > 399) {
var err = new Error(ftpResponse.text || "Unknown FTP error.");
err.code = ftpResponse.code;
callback(err);
}
else {
callback(null, ftpResponse);
}
}
this.nextCmd();
}
this.nextCmd();
};

this._initialize = function(callback) {
Expand Down Expand Up @@ -553,9 +555,18 @@ Ftp.getPasvPort = function(text) {
if (err) return callback(err);

socket.pause();
self._enqueueCmd("retr " + path, function(err, res) {
callback(err, socket);
});

var called = false;

var cmdCallback = function(err, res) {
if (!called) {
called = true;
callback(err, socket);
}
};
cmdCallback.acceptsMarks = true;

self._enqueueCmd("retr " + path, cmdCallback);
});
};

Expand All @@ -575,28 +586,30 @@ Ftp.getPasvPort = function(text) {
});
};

this.getPutSocket = function(path, callback) {
this.getPutSocket = function(path, callback, doneCallback) {
var self = this;
this.getPasvSocket(function(err, socket) {
if (err) return callback(err);

var hadErr;
self._enqueueCmd("stor " + path, function(err, res) {
if (err) hadErr = err;
});
var called = false;

// This is not a great solution. A STOR command only gives back an error
// or a Mark. Marks are not very reliable, which is why jsftp doesn't
// take them into account. But then if STOR is successful we never
// know because we don't receive any response.
// Still, the socket is just open and we can start to pump, so we set
// a timeout and after that we pass the socket to the callback along
// with any errors that STOR gave back.
// A way to make this more reliable would be to look at the response
// mark (Should be 150) and only then if it is ok send the socket back.
setTimeout(function(err, res) {
callback(hadErr, socket);
}, 100);
var cmdCallback = function(err, res) {
if (!called) {
called = true;
callback(err, socket);
} else {
if (doneCallback) {
if (err) {
doneCallback(err)
} else {
doneCallback(err, res)
}
}
}
};
cmdCallback.acceptsMarks = true;

self._enqueueCmd("stor " + path, cmdCallback);
});
};

Expand Down
70 changes: 70 additions & 0 deletions test/jsftp_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,76 @@ describe("jsftp test suite", function() {
function handler() { counter++; }
});

it("test get a big file stream", function(next) {
var remotePath = remoteCWD + "/bigfile.test";

var data = (new Array(1*1024*1024)).join("x");

var buffer = new Buffer(data, "binary");

ftp.put(remotePath, buffer, function(err, res) {
assert.ok(!err, err);

ftp.getGetSocket(remotePath, function(err, socket) {
assert.ok(!err, err);

socket.resume();

var counter = 0;

socket.on('data', function(data) {
counter += data.length;
});

socket.on('close', function() {
assert.equal(buffer.length, counter);

ftp.raw.dele(remotePath, function(err, data) {
assert.ok(!err);
next();
});
});
});
});
});

it("test put a big file stream", function(next) {
var remotePath = remoteCWD + "/bigfile.test";

var data = (new Array(1*1024*1024)).join("x");

var buffer = new Buffer(data, "binary");

ftp.getPutSocket(remotePath, function(err, socket) {
assert.ok(!err, err);

socket.write(data, function(err) {
assert.ok(!err, err);

socket.end();
});
}, function(err, res) {
assert.equal(res.code, 226);

ftp.raw.dele(remotePath, function(err, data) {
assert.ok(!err);
next();
});
});
});

it("test put a big file stream fail", function(next) {
var remotePath = remoteCWD + "/nonexisting/path/to/file.txt";

ftp.getPutSocket(remotePath, function(err, socket, res) {
assert.ok(err, err);
assert.equal(err.code, 550);
next();
}, function(err, res) {
assert.ok(false);
});
});

it("test get fileList array", function(next) {
var file1 = "testfile.txt";

Expand Down

0 comments on commit acf4911

Please sign in to comment.