Skip to content

Commit

Permalink
Conversion to ReadableStream and WritableStream.
Browse files Browse the repository at this point in the history
- Add new methods downloadReadable
  and uploadWritable to use new interfaces
  for file operations.
  • Loading branch information
nullobsi committed Dec 23, 2023
1 parent f95fe1d commit 6364506
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 58 deletions.
152 changes: 108 additions & 44 deletions classes/FTPClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ import * as Regexes from "../util/regexes.ts";
import free from "../util/free.ts";
import { FeatMatrix, FEATURES } from "../types/FeatMatrix.ts";
import {
iterateReader,
readAll,
writeAll,
} from "https://deno.land/std@0.166.0/streams/conversion.ts";
toArrayBuffer,
toText,
TextLineStream,
} from "https://deno.land/std@0.210.0/streams/mod.ts";
import { FTPFileInfo } from "../types/FTPFileInfo.ts";
import FTPReply from "../types/FTPReply.ts";

export class FTPClient implements Deno.Closer {
private conn?: Deno.Conn;
private connLineReader?: ReadableStream<string>;

private dataConn?: Deno.Conn;

private activeListener?: Deno.Listener;

private opts: IntConnOpts;
private encode = new TextEncoder();
private decode = new TextDecoder();

private feats: FeatMatrix;

Expand Down Expand Up @@ -218,19 +220,21 @@ export class FTPClient implements Deno.Closer {
* @param fileName
*/
public async download(fileName: string) {
const downloadConn = await this.downloadStream(fileName);
const data = await readAll(downloadConn);

const readable = await this.downloadReadable(fileName);
const data = await toArrayBuffer(readable);

await this.finalizeStream();

return data;
return new Uint8Array(data);
}

/**
* Download a file from the server with streaming.
* **Please call FTPClient.finalizeStream()** to release the lock after the file is done downloading.
* @param fileName
* Download a file from the server using a ReadableStream interface.
* **Please call FTPClient.finalizeStream** to release the lock
* after the file is downloaded.
*/
public async downloadStream(fileName: string): Promise<Deno.Reader> {
public async downloadReadable(fileName: string): Promise<ReadableStream> {
await this.lock.lock();
if (this.conn === undefined) {
this.lock.unlock();
Expand All @@ -252,8 +256,24 @@ export class FTPClient implements Deno.Closer {
);
}

const conn = await this.finalizeDataConnection();
return conn.readable;
}

return await this.finalizeDataConnection();
/**
* Download a file from the server with streaming.
* **Please call FTPClient.finalizeStream()** to release the lock after the file is done downloading.
* @deprecated Use downloadReadable instead.
* @param fileName
*/
public async downloadStream(fileName: string): Promise<Deno.Reader> {
await this.downloadReadable(fileName);

if (!this.dataConn) {
throw new Error("Could not get download stream!");
}

return this.dataConn;
}

/**
Expand All @@ -262,18 +282,22 @@ export class FTPClient implements Deno.Closer {
* @param data
*/
public async upload(fileName: string, data: Uint8Array) {
const conn = await this.uploadStream(fileName, data.byteLength);
await writeAll(conn, data);
const writable = await this.uploadWritable(fileName, data.byteLength);
const writer = writable.getWriter();

await writer.write(data);

await this.finalizeStream();
}

/**
* Upload a file to the server, with streaming.
* **Please call FTPClient.finalizeStream()** to release the lock after the file is done downloading.**
* Upload a file using a WritableStream interface.
* **Please call FTPClient.finalizeStream()** to release the lock after
* the file is uploaded.
* @param fileName
* @param allocate Number of bytes to allocate to the file. Some servers require this parameter.
*/
public async uploadStream(fileName: string, allocate?: number) {
public async uploadWritable(fileName: string, allocate?: number): Promise<WritableStream> {
await this.lock.lock();
if (this.conn === undefined) {
this.lock.unlock();
Expand Down Expand Up @@ -308,14 +332,34 @@ export class FTPClient implements Deno.Closer {
);
}

return await this.finalizeDataConnection();
const conn = await this.finalizeDataConnection();

return conn.writable;
}

/**
* Upload a file to the server, with streaming.
* **Please call FTPClient.finalizeStream()** to release the lock after the file is done downloading.**
* @param fileName
* @param allocate Number of bytes to allocate to the file. Some servers require this parameter.
* @deprecated Use uploadWritable instead.
*/
public async uploadStream(fileName: string, allocate?: number): Promise<Deno.Writer> {
await this.uploadWritable(fileName, allocate);

if (!this.dataConn) {
throw new Error("Failed to get upload channel!");
}

return this.dataConn;
}

/**
* Unlock and close connections for streaming.
*/
public async finalizeStream() {
free(this.dataConn);
this.dataConn = undefined;

const res = await this.getStatus();
this.assertStatus(StatusCodes.DataClose, res);
Expand All @@ -338,14 +382,19 @@ export class FTPClient implements Deno.Closer {
birthtime: null,
blksize: null,
blocks: null,
dev: null,
dev: NaN,
gid: null,
ino: null,
mode: null,
nlink: null,
rdev: null,
uid: null,

isBlockDevice: null,
isFifo: null,
isSocket: null,
isCharDevice: null,

mtime: null,
ctime: null,
isSymlink: false,
Expand Down Expand Up @@ -552,14 +601,19 @@ export class FTPClient implements Deno.Closer {
birthtime: null,
blksize: null,
blocks: null,
dev: null,
dev: NaN,
gid: null,
ino: null,
mode: null,
nlink: null,
rdev: null,
uid: null,

isCharDevice: null,
isFifo: null,
isSocket: null,
isBlockDevice: null,

mtime: null,
ctime: null,
isSymlink: false,
Expand Down Expand Up @@ -664,7 +718,12 @@ export class FTPClient implements Deno.Closer {
const encoded = this.encode.encode(
`${c.toString()}${args ? " " + args : ""}\r\n`,
);
await this.conn.write(encoded);

const writer = this.conn.writable.getWriter();
await writer.write(encoded);

writer.releaseLock();

return await this.getStatus();
}

Expand All @@ -682,44 +741,49 @@ export class FTPClient implements Deno.Closer {
}

const conn = await this.finalizeDataConnection();
const data = await readAll(conn);
const text = await toText(conn.readable);

free(conn);

res = await this.getStatus();
this.assertStatus(StatusCodes.DataClose, res);

this.lock.unlock();

return this.decode.decode(data);
return text;
}

//parse response from FTP control channel
private async getStatus(): Promise<FTPReply> {
if (!this.conn) throw FTPClient.notInit();

let s = "";
const iter = iterateReader(this.conn);

for await (const a of iter) {
const decoded = this.decode.decode(a);
s += decoded;
if (s[3] !== "-") {
if (s.endsWith("\r\n")) {

if (!this.connLineReader) {
this.connLineReader = this.conn.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
}

const lines: string[] = [];
for await (const line of this.connLineReader.values({ preventCancel: true })) {
lines.push(line);
if (lines.length > 1) {
// Status Code + SPACE signifies end.
if (line.startsWith(lines[0].substring(0,3) + " ")) {
break;
}
} else {
const i = s.lastIndexOf("\r\n");
if (i !== -1) {
const pi = s.lastIndexOf("\r\n", i - 1);
const lastLine = s.substring(pi + 2, i);
if (lastLine.startsWith(s.substring(0, 3))) {
break;
}
}
}
// Not a multi-line message. Continue.
else if (lines[0][3] !== "-") break;
}

const statusCode = parseInt(lines[0].substring(0, 3));

if (lines.length > 1) {
const lastLine = lines[lines.length - 1];
lines[lines.length - 1] = lastLine.slice(4);
}
const statusCode = parseInt(s.substring(0, 3));
const message = s.length > 3 ? s.substring(4).trimEnd() : "";

const message = lines.join("\r\n").slice(4);

return {
code: statusCode,
Expand Down
13 changes: 6 additions & 7 deletions examples/download.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
// Requires --allow-net and --allow-write
import { FTPClient } from "https://deno.land/x/ftpc/mod.ts";
import { FTPClient } from "../mod.ts";

// Connect as anonymous user
let client = new FTPClient("speedtest.tele2.net");
const client = new FTPClient("speedtest.tele2.net");
await client.connect();
console.log("Connected!");

// Download test file
console.log("Downloading...");
let file = await Deno.open("./5MB.zip", {
const file = await Deno.open("./5MB.zip", {
create: true,
write: true,
});
let stream = await client.downloadStream("5MB.zip");
await Deno.copy(stream, file);
const stream = await client.downloadReadable("5MB.zip");
await stream.pipeTo(file.writable);

// Close download stream
// Close download stream. File is already closed by pipeTo method.
await client.finalizeStream();
file.close();
console.log("Finished!");

// Log off server
Expand Down
14 changes: 7 additions & 7 deletions examples/upload.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { FTPClient } from "../mod.ts";

// Create a connection to an FTP server
let client = new FTPClient("ftp.server", {
const client = new FTPClient("ftp.server", {
// Enable TLS
tlsOpts: {
implicit: false,
Expand All @@ -21,26 +21,26 @@ let client = new FTPClient("ftp.server", {
await client.connect();

// Generate random data
let randomData = new Uint8Array(4096);
const randomData = new Uint8Array(4096);
crypto.getRandomValues(randomData);

// CD into the folder "files" on the server
await client.chdir("files");

// Create a stream to upload the file random.bin with a size of 4096 bytes
let uploadStream = await client.uploadStream("random.bin", 4096);
await uploadStream.write(randomData);
const uploadStream = await client.uploadWritable("random.bin", 4096);
uploadStream.getWriter().write(randomData);

// Close the stream and notify the server that file upload is complete
await client.finalizeStream();

// Redownload the file from the server
let downloadedData = await client.download("random.bin");
const downloadedData = new Uint8Array(await client.download("random.bin"));

// Compare the files
for (let i = 0; i < randomData.length; i++) {
let n1 = randomData[i];
let n2 = downloadedData[i];
const n1 = randomData[i];
const n2 = downloadedData[i];
if (n1 !== n2) {
console.log(`Files are not the same at ${i}!`);
}
Expand Down

0 comments on commit 6364506

Please sign in to comment.