Skip to content

Commit

Permalink
cat: use fileserver.Client
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jan 22, 2024
1 parent fcdbe74 commit 3e8faa8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 66 deletions.
94 changes: 29 additions & 65 deletions packages/cat/src/file-client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import fs from "node:fs";
import fsPromises from "node:fs/promises";
import fs from "node:fs/promises";
import path from "node:path/posix";

import { FileMetadata, lsKeyword, parseDirectoryListing } from "@ndn/fileserver";
import { Component, type ComponentLike, Name } from "@ndn/packet";
import { retrieveMetadata } from "@ndn/rdr";
import { fetch } from "@ndn/segmented-object";
import { console } from "@ndn/util";
import { Client } from "@ndn/fileserver";
import { Name } from "@ndn/packet";
import { assert, console } from "@ndn/util";
import { pushable } from "it-pushable";
import { consume, parallelMap, writeToStream } from "streaming-iterables";
import { consume, parallelMap } from "streaming-iterables";
import type { CommandModule } from "yargs";

import { type CommonArgs, Segment } from "./util";
Expand Down Expand Up @@ -51,8 +48,7 @@ export const FileClientCommand: CommandModule<CommonArgs, Args> = {

handler({ remote, local, jobs, retx }) {
const dl = new Downloader(remote, local, jobs, retx);
const abort = new AbortController();
return dl.run(abort.signal);
return dl.run();
},
};

Expand All @@ -66,14 +62,18 @@ class Downloader {
this.local = path.resolve(local);
}

private client!: Client;
private readonly local: string;
private readonly queue = pushable<Job>({ objectMode: true });
private signal!: AbortSignal;
private nProcessing = 0;
private nQueued = 0;

public async run(signal: AbortSignal) {
this.signal = signal;
public async run() {
this.client = new Client(this.remote, {
segmentNumConvention: Segment,
retx: this.retx,
retxLimit: this.retx,
});
this.enqueue("folder", this.local);
await consume(parallelMap(this.jobs, this.processJob, this.queue));
}
Expand Down Expand Up @@ -107,47 +107,19 @@ class Downloader {
}
};

private deriveName(local: string, ...suffix: ComponentLike[]): Name {
const relPath = path.relative(this.local, local);
const relComps = relPath === "" ? [] : relPath.split("/").map((s) => {
if (s === "..") {
throw new Error(`${local} is outside ${this.local}`);
}
return new Component(undefined, s);
});
return this.remote.append(...relComps, ...suffix);
}

private async mFetch(remote: Name): Promise<MFetch> {
const metadata = await retrieveMetadata(remote, FileMetadata, {
retx: this.retx,
signal: this.signal,
});
const { name, lastSeg } = metadata;
return {
metadata,
fetching: fetch(name, {
segmentNumConvention: Segment,
segmentRange: lastSeg === undefined ? undefined : [0, 1 + lastSeg],
estimatedFinalSegNum: lastSeg,
retxLimit: this.retx,
signal: this.signal,
}),
};
private relPath(local: string): string {
const p = path.relative(this.local, local);
assert(!p.startsWith(".."), "path outside top-level directory");
return p;
}

private async downloadFolder(local: string) {
const remote = this.deriveName(local, lsKeyword);
const { metadata: { isDir }, fetching } = await this.mFetch(remote);
if (!isDir) {
throw new Error("not a directory");
}
const ls = await fetching;
const m = await this.client.stat(this.relPath(local));
await fs.mkdir(local, { recursive: true });

await fsPromises.mkdir(local, { recursive: true });
let nFolders = 0;
let nFiles = 0;
for (const { name, isDir } of parseDirectoryListing(ls)) {
for await (const { name, isDir } of this.client.readdir(m)) {
if (isDir) {
this.enqueue("folder", path.resolve(local, name));
++nFolders;
Expand All @@ -160,36 +132,28 @@ class Downloader {
}

private async downloadFile(local: string) {
const remote = this.deriveName(local);
const { metadata: { isFile, atime = new Date(), mtime, size }, fetching } = await this.mFetch(remote);
if (!isFile) {
throw new Error("not a file");
}
const m = await this.client.stat(this.relPath(local));
const fh = await fs.open(local, "w");

const fetching = this.client.readFile(m);

let file: fs.WriteStream | undefined;
let ok = false;
try {
file = fs.createWriteStream(local);
await writeToStream(file, fetching.chunks());
await fetching.pipe(fh.createWriteStream());
ok = true;
} finally {
file?.close();
await fh.close();
if (!ok) {
await fsPromises.unlink(local);
await fs.unlink(local);
}
}

await fsPromises.utimes(local, atime, mtime);
console.log(`FILE ${local} size=${size}`);
await fs.utimes(local, m.atime ?? new Date(), m.mtime);
console.log(`FILE ${local} size=${m.size}`);
}
}

interface Job {
kind: "folder" | "file";
local: string;
}

interface MFetch {
metadata: FileMetadata;
fetching: fetch.Result;
}
2 changes: 1 addition & 1 deletion packages/fileserver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"types": "lib/mod.d.ts"
},
"dependencies": {
"@ndn/endpoint": "workspace:*",
"@ndn/naming-convention2": "workspace:*",
"@ndn/packet": "workspace:*",
"@ndn/rdr": "workspace:*",
Expand All @@ -36,6 +35,7 @@
"tslib": "^2.6.2"
},
"devDependencies": {
"@ndn/endpoint": "workspace:*",
"streaming-iterables": "^8.0.1"
}
}

0 comments on commit 3e8faa8

Please sign in to comment.