Skip to content

Commit

Permalink
segmented-object: reimplement fetcher logic
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed May 5, 2024
1 parent 6297fb4 commit 311efda
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 623 deletions.
2 changes: 2 additions & 0 deletions pkg/segmented-object/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
"@ndn/util": "workspace:*",
"event-iterator": "^2.0.0",
"hirestime": "^7.0.4",
"it-keepalive": "^1.2.0",
"mnemonist": "^0.39.8",
"obliterator": "^2.0.4",
"p-defer": "^4.0.1",
"p-event": "^6.0.1",
"p-lazy": "^4.0.0",
Expand Down
77 changes: 50 additions & 27 deletions pkg/segmented-object/src/fetch/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,43 @@
import { type Data, Name, type NameLike } from "@ndn/packet";
import type { ConsumerOptions, Endpoint } from "@ndn/endpoint";
import { type Data, Name, type NameLike, type Verifier } from "@ndn/packet";
import { assert, concatBuffers, Reorder } from "@ndn/util";
import EventIterator from "event-iterator";
import { collect, map, type WritableStreamish, writeToStream } from "streaming-iterables";
// import EventIterator from "event-iterator";
import { collect, map, parallelMap, type WritableStreamish, writeToStream } from "streaming-iterables";
import type { Promisable } from "type-fest";

import { Fetcher } from "./fetcher";
import { type SegData, UnverifiedFetcher, type UnverifiedFetcherOptions } from "./unverified";

class FetchResult implements fetch.Result {
constructor(private readonly name: Name, private readonly opts: fetch.Options) {}

public get count(): number { return this.ctx?.count ?? 0; }
private ctx?: Fetcher;
public get count(): number { return this.uvf?.count ?? 0; }
private uvf?: UnverifiedFetcher;
private promise?: Promise<Uint8Array>;

private startFetcher() {
assert(!this.ctx, "fetch.Result is already used");
const ctx = new Fetcher(this.name, this.opts);
this.ctx = ctx;
return new EventIterator<Fetcher.SegmentData>(({ push, stop, fail, on }) => {
let resume: (() => void) | undefined;
on("highWater", () => { resume = ctx.pause(); });
on("lowWater", () => { resume?.(); });

const abort = new AbortController();
ctx.addEventListener("segment", push, { signal: abort.signal });
ctx.addEventListener("end", stop, { signal: abort.signal });
ctx.addEventListener("error", ({ detail }) => fail(detail), { signal: abort.signal });
return () => {
resume?.();
abort.abort();
};
});
private startFetcher(): AsyncIterable<SegData> {
assert(!this.uvf, "fetch.Result is already used");
const opts = {
...this.opts.endpoint?.cOpts, // eslint-disable-line etc/no-deprecated
...this.opts.cOpts,
...this.opts,
};
this.uvf = new UnverifiedFetcher(this.name, opts);
return parallelMap(16, async ({ seg, data }) => {
await opts.verifier?.verify(data);
return { seg, data };
}, this.uvf.fetch());
}

public unordered() {
return map(
({ data, segNum }) => Object.assign(data, { segNum }),
({ data, seg: segNum }) => Object.assign(data, { segNum }),
this.startFetcher(),
);
}

private async *ordered() {
const reorder = new Reorder<Data>(this.opts.segmentRange?.[0]);
for await (const { segNum, data } of this.startFetcher()) {
for await (const { seg: segNum, data } of this.startFetcher()) {
reorder.push(segNum, data);
yield* reorder.shift();
}
Expand Down Expand Up @@ -91,7 +86,35 @@ export function fetch(name: NameLike, opts: fetch.Options = {}): fetch.Result {

export namespace fetch {
/** {@link fetch} options. */
export interface Options extends Fetcher.Options {}
export interface Options extends UnverifiedFetcherOptions {
/**
* Inherit fetcher options from Endpoint consumer options.
* @deprecated Specify `.cOpts`.
*/
endpoint?: Endpoint;

/**
* Inherit fetcher options from consumer options.
*
* @remarks
* These options are inherited if the corresponding fetcher option is unset:
* - `describe`
* - `fw`
* - `modifyInterest`
* - `signal`
* - `verifier`
*
* Other options cannot be inherited, notably:
* - `retx`
*/
cOpts?: ConsumerOptions;

/**
* Data verifier.
* @defaultValue noopSigning
*/
verifier?: Verifier;
}

/**
* Return type of {@link fetch} function.
Expand Down
228 changes: 0 additions & 228 deletions pkg/segmented-object/src/fetch/fetcher.ts

This file was deleted.

Loading

0 comments on commit 311efda

Please sign in to comment.