diff --git a/pkg/segmented-object/package.json b/pkg/segmented-object/package.json index 290b3dce..cab74f95 100644 --- a/pkg/segmented-object/package.json +++ b/pkg/segmented-object/package.json @@ -33,7 +33,9 @@ "@ndn/util": "workspace:*", "event-iterator": "^2.0.0", "hirestime": "^7.0.4", + "it-keepalive": "^1.2.0", "mnemonist": "^0.39.8", + "obliterator": "^2.0.4", "p-defer": "^4.0.1", "p-event": "^6.0.1", "p-lazy": "^4.0.0", diff --git a/pkg/segmented-object/src/fetch/fetch.ts b/pkg/segmented-object/src/fetch/fetch.ts index a61f0d22..7201dece 100644 --- a/pkg/segmented-object/src/fetch/fetch.ts +++ b/pkg/segmented-object/src/fetch/fetch.ts @@ -1,48 +1,43 @@ -import { type Data, Name, type NameLike } from "@ndn/packet"; +import type { ConsumerOptions, Endpoint } from "@ndn/endpoint"; +import { type Data, Name, type NameLike, type Verifier } from "@ndn/packet"; import { assert, concatBuffers, Reorder } from "@ndn/util"; -import EventIterator from "event-iterator"; -import { collect, map, type WritableStreamish, writeToStream } from "streaming-iterables"; +// import EventIterator from "event-iterator"; +import { collect, map, parallelMap, type WritableStreamish, writeToStream } from "streaming-iterables"; import type { Promisable } from "type-fest"; -import { Fetcher } from "./fetcher"; +import { type SegData, UnverifiedFetcher, type UnverifiedFetcherOptions } from "./unverified"; class FetchResult implements fetch.Result { constructor(private readonly name: Name, private readonly opts: fetch.Options) {} - public get count(): number { return this.ctx?.count ?? 0; } - private ctx?: Fetcher; + public get count(): number { return this.uvf?.count ?? 0; } + private uvf?: UnverifiedFetcher; private promise?: Promise; - private startFetcher() { - assert(!this.ctx, "fetch.Result is already used"); - const ctx = new Fetcher(this.name, this.opts); - this.ctx = ctx; - return new EventIterator(({ push, stop, fail, on }) => { - let resume: (() => void) | undefined; - on("highWater", () => { resume = ctx.pause(); }); - on("lowWater", () => { resume?.(); }); - - const abort = new AbortController(); - ctx.addEventListener("segment", push, { signal: abort.signal }); - ctx.addEventListener("end", stop, { signal: abort.signal }); - ctx.addEventListener("error", ({ detail }) => fail(detail), { signal: abort.signal }); - return () => { - resume?.(); - abort.abort(); - }; - }); + private startFetcher(): AsyncIterable { + assert(!this.uvf, "fetch.Result is already used"); + const opts = { + ...this.opts.endpoint?.cOpts, // eslint-disable-line etc/no-deprecated + ...this.opts.cOpts, + ...this.opts, + }; + this.uvf = new UnverifiedFetcher(this.name, opts); + return parallelMap(16, async ({ seg, data }) => { + await opts.verifier?.verify(data); + return { seg, data }; + }, this.uvf.fetch()); } public unordered() { return map( - ({ data, segNum }) => Object.assign(data, { segNum }), + ({ data, seg: segNum }) => Object.assign(data, { segNum }), this.startFetcher(), ); } private async *ordered() { const reorder = new Reorder(this.opts.segmentRange?.[0]); - for await (const { segNum, data } of this.startFetcher()) { + for await (const { seg: segNum, data } of this.startFetcher()) { reorder.push(segNum, data); yield* reorder.shift(); } @@ -91,7 +86,35 @@ export function fetch(name: NameLike, opts: fetch.Options = {}): fetch.Result { export namespace fetch { /** {@link fetch} options. */ - export interface Options extends Fetcher.Options {} + export interface Options extends UnverifiedFetcherOptions { + /** + * Inherit fetcher options from Endpoint consumer options. + * @deprecated Specify `.cOpts`. + */ + endpoint?: Endpoint; + + /** + * Inherit fetcher options from consumer options. + * + * @remarks + * These options are inherited if the corresponding fetcher option is unset: + * - `describe` + * - `fw` + * - `modifyInterest` + * - `signal` + * - `verifier` + * + * Other options cannot be inherited, notably: + * - `retx` + */ + cOpts?: ConsumerOptions; + + /** + * Data verifier. + * @defaultValue noopSigning + */ + verifier?: Verifier; + } /** * Return type of {@link fetch} function. diff --git a/pkg/segmented-object/src/fetch/fetcher.ts b/pkg/segmented-object/src/fetch/fetcher.ts deleted file mode 100644 index 8badc2e2..00000000 --- a/pkg/segmented-object/src/fetch/fetcher.ts +++ /dev/null @@ -1,228 +0,0 @@ -import type { ConsumerOptions, Endpoint } from "@ndn/endpoint"; -import { CancelInterest, Forwarder, type FwFace, FwPacket } from "@ndn/fw"; -import { Data, Interest, type Name, type Verifier } from "@ndn/packet"; -import { TypedEventTarget } from "typescript-event-target"; - -import { defaultSegmentConvention, type SegmentConvention } from "../convention"; -import { FetchLogic } from "./logic"; - -type EventMap = { - /** Emitted when a Data segment arrives. */ - segment: Fetcher.SegmentDataEvent; - /** Emitted after all data chunks arrive. */ - end: Event; - /** Emitted upon error. */ - error: CustomEvent; -}; - -/** Fetch Data packets as guided by FetchLogic. */ -export class Fetcher extends TypedEventTarget { - /** Number of segments retrieved so far. */ - public get count() { return this.count_; } - private count_ = 0; - private readonly logic: FetchLogic; - private readonly face: FwFace; - private readonly segmentNumConvention: SegmentConvention; - private readonly modifyInterest: Interest.ModifyFunc; - private readonly signal?: AbortSignal; - private readonly lifetimeAfterRto: number; - private readonly acceptContentType: readonly number[]; - private readonly verifier?: Verifier; - - constructor(private readonly name: Name, opts: Fetcher.Options) { - super(); - - const { - fw = Forwarder.getDefault(), - describe = `fetch(${name})`, - segmentNumConvention = defaultSegmentConvention, - modifyInterest, - signal, - lifetimeAfterRto = 1000, - acceptContentType = [0], - verifier, - } = { - ...opts.endpoint?.cOpts, // eslint-disable-line etc/no-deprecated - ...opts.cOpts, - ...opts, - }; - - this.segmentNumConvention = segmentNumConvention; - this.modifyInterest = Interest.makeModifyFunc(modifyInterest); - this.signal = signal; - this.lifetimeAfterRto = lifetimeAfterRto; - this.acceptContentType = acceptContentType; - this.verifier = verifier; - - this.logic = new FetchLogic(opts); - this.logic.addEventListener("end", () => { - this.dispatchTypedEvent("end", new Event("end")); - this.close(); - }); - this.logic.addEventListener("exceedRetxLimit", ({ detail: segNum }) => { - this.fail(new Error(`cannot retrieve segment ${segNum}`)); - }); - - this.face = fw.addFace({ - rx: this.tx(), - tx: this.rx, - }, { describe }); - - this.signal?.addEventListener("abort", this.handleAbort); - } - - public close(): void { - this.signal?.removeEventListener("abort", this.handleAbort); - this.logic.close(); - this.face.close(); - } - - /** - * Pause outgoing Interests, for backpressure from Data consumer. - * @returns Function for resuming. - */ - public pause() { - return this.logic.pause(); - } - - private tx(): AsyncIterable { - return this.logic.outgoing( - ({ segNum, rto }) => { - const interest = new Interest(this.name.append(this.segmentNumConvention, segNum), - Interest.Lifetime(rto + this.lifetimeAfterRto)); - this.modifyInterest(interest); - return FwPacket.create(interest, segNum); - }, - ({ interest: { l3, token } }) => new CancelInterest(l3, token), - ); - } - - private readonly rx = async (iterable: AsyncIterable) => { - for await (const { l3, token, congestionMark = 0 } of iterable) { - if (l3 instanceof Data && typeof token === "number" && this.acceptContentType.includes(l3.contentType)) { - await this.handleData(l3, token, congestionMark); - } - } - const ok = this.logic.end(); - if (!ok) { - this.fail(new Error("fetch incomplete")); - } - }; - - private async handleData(data: Data, segNum: number, congestionMark: number) { - const now = this.logic.now(); - - try { - await this.verifier?.verify(data); - } catch (err: unknown) { - this.fail(new Error(`cannot verify segment ${segNum}: ${err}`)); - return; - } - - // TODO don't allow FetchLogic to trigger retx during verification. - this.logic.satisfy(segNum, now, congestionMark !== 0); - if (data.isFinalBlock) { - this.logic.setFinalSegNum(segNum); - } else if (data.finalBlockId?.is(this.segmentNumConvention)) { - this.logic.setFinalSegNum(data.finalBlockId.as(this.segmentNumConvention), true); - } - ++this.count_; - this.dispatchTypedEvent("segment", new Fetcher.SegmentDataEvent("segment", segNum, data)); - } - - private fail(err: Error): void { - setTimeout(() => { - this.dispatchTypedEvent("error", new CustomEvent("error", { detail: err })); - this.close(); - }, 0); - } - - private readonly handleAbort = () => { - this.fail(new Error("fetch aborted")); - }; -} - -export namespace Fetcher { - export interface Options extends FetchLogic.Options { - /** - * Inherit fetcher options from Endpoint consumer options. - * @deprecated Specify `.cOpts`. - */ - endpoint?: Endpoint; - - /** - * Inherit fetcher options from consumer options. - * - * @remarks - * These options are inherited if the corresponding fetcher option is unset: - * - `describe` - * - `fw` - * - `modifyInterest` - * - `signal` - * - `verifier` - * - * Other options cannot be inherited, notably: - * - `retx` - */ - cOpts?: ConsumerOptions; - - /** - * Use the specified logical forwarder. - * @defaultValue `Forwarder.getDefault()` - */ - fw?: Forwarder; - - /** - * FwFace description. - * @defaultValue "fetch" + name - */ - describe?: string; - - /** - * Choose a segment number naming convention. - * @defaultValue `Segment3` - */ - segmentNumConvention?: SegmentConvention; - - /** - * Modify Interest according to specified options. - * - * @remarks - * This can also be used to witness Interests without modification. - */ - modifyInterest?: Interest.Modify; - - /** AbortSignal that allows canceling the Interest via AbortController. */ - signal?: AbortSignal; - - /** - * InterestLifetime added to RTO. - * @defaultValue 1000ms - */ - lifetimeAfterRto?: number; - - /** - * List of acceptable ContentType values. - * @defaultValue `[0]` - */ - acceptContentType?: readonly number[]; - - /** - * Data verifier. - * @defaultValue - * No verification. - */ - verifier?: Verifier; - } - - export interface SegmentData { - segNum: number; - data: Data; - } - - export class SegmentDataEvent extends Event implements SegmentData { - constructor(type: string, public readonly segNum: number, public readonly data: Data) { - super(type); - } - } -} diff --git a/pkg/segmented-object/src/fetch/logic.ts b/pkg/segmented-object/src/fetch/logic.ts deleted file mode 100644 index 21919044..00000000 --- a/pkg/segmented-object/src/fetch/logic.ts +++ /dev/null @@ -1,310 +0,0 @@ -import { assert } from "@ndn/util"; -import hirestime from "hirestime"; -import DefaultWeakMap from "mnemonist/default-weak-map.js"; -import pDefer from "p-defer"; -import { pEvent } from "p-event"; -import { TypedEventTarget } from "typescript-event-target"; - -import type { CongestionAvoidance } from "./congestion-avoidance"; -import { RttEstimator } from "./rtt-estimator"; -import { TcpCubic } from "./tcp-cubic"; -import { TokenLimiter } from "./token-limiter"; - -const tokenLimiters = new DefaultWeakMap((ca) => { - const tl = new TokenLimiter(); - tl.capacity = ca.cwnd; - ca.addEventListener("cwndupdate", () => { tl.capacity = ca.cwnd; }); - return tl; -}); - -class SegState { - constructor(public readonly segNum: number) {} - - public get isRetx() { return this.nRetx > 0; } - - public nRetx = 0; - public txTime = 0; - public rto = 0; - public rtoExpiry?: NodeJS.Timeout | number; - public interest: any; -} - -type SegRequest = Pick, "segNum" | "isRetx" | "rto"> & { - interest: T; -}; - -type EventMap = { - /** Periodical unblock, for internal use. */ - unblock: Event; - - /** Fetching finished. */ - end: Event; - - /** - * A segment request has exceeded maximum retx limit and will not be retried. - * Event detail is the segment number. - */ - exceedRetxLimit: CustomEvent; -}; - -/** Congestion control logic. */ -export class FetchLogic extends TypedEventTarget { - /** Internal clock. */ - public readonly now: () => number = hirestime(); - - private readonly rtte: RttEstimator; - private readonly ca: CongestionAvoidance; - private readonly tl: TokenLimiter; - - private readonly pending = new Map(); - private readonly retxQueue = new Set(); - private readonly retxLimit: number; - - private hiInterestSegNum: number; - private hiDataSegNum = 0; - private finalSegNum: number; - private estimatedFinalSegNum: number; - private cwndDecreaseSegNum = -1; - - private running = true; - private processCancels = false; - private paused?: Promise; - - constructor({ - rtte, - ca = new TcpCubic(), - segmentRange = [0, undefined], - estimatedFinalSegNum, - retxLimit = 15, - }: FetchLogic.Options) { - super(); - this.rtte = rtte instanceof RttEstimator ? rtte : new RttEstimator(rtte); - this.ca = ca; - this.tl = tokenLimiters.get(this.ca); - this.retxLimit = retxLimit; - - this.hiInterestSegNum = segmentRange[0] - 1; - this.finalSegNum = Math.min(segmentRange[1] ?? Infinity, Number.MAX_SAFE_INTEGER) - 1; - assert(this.hiInterestSegNum < this.finalSegNum, "invalid segmentRange"); - this.estimatedFinalSegNum = estimatedFinalSegNum ?? this.finalSegNum; - } - - /** Abort. */ - public close(): void { - this.running = false; - this.dispatchTypedEvent("unblock", new Event("unblock")); - for (const [, { rtoExpiry }] of this.pending) { - clearTimeout(rtoExpiry); - } - this.tl.put(this.pending.size - this.retxQueue.size); - } - - /** - * Pause outgoing Interests, for backpressure from Data consumer. - * @returns Function for resuming. - */ - public pause(): () => void { - const defer = pDefer(); - this.paused = defer.promise; - return () => { - defer.resolve(); - this.paused = undefined; - }; - } - - /** Generate stream of outgoing requests. */ - public async *outgoing( - makeInterest: (req: SegRequest) => T, - cancelInterest: (req: SegRequest) => C, - ): AsyncGenerator { - while (this.running) { - await this.paused; - await this.tl.take(); - if (!this.running) { - this.tl.put(); - break; - } - - if (this.processCancels) { - for (const [segNum, req] of this.pending) { - if (segNum <= this.finalSegNum) { continue; } - this.pending.delete(segNum); - - if (!this.retxQueue.delete(segNum)) { - clearTimeout(req.rtoExpiry); - this.tl.put(); - yield cancelInterest(req); - } - } - this.processCancels = false; - } - - if (this.retxQueue.size > 0) { - let segNum!: number; - // eslint-disable-next-line no-unreachable-loop - for (segNum of this.retxQueue) { - this.retxQueue.delete(segNum); - break; - } - - const req = this.pending.get(segNum)!; - assert(!!req); - ++req.nRetx; - - yield this.prepareRequest(req, makeInterest); - continue; - } - - if (this.hiInterestSegNum < this.estimatedFinalSegNum) { - const segNum = ++this.hiInterestSegNum; - const req = new SegState(segNum); - this.pending.set(segNum, req); - - yield this.prepareRequest(req, makeInterest); - continue; - } - - this.tl.put(); - if (this.pending.size === 0 && this.estimatedFinalSegNum >= this.finalSegNum) { - this.dispatchTypedEvent("end", new Event("end")); - break; - } - await pEvent(this, "unblock"); - } - } - - private prepareRequest(req: SegState, makeInterest: (req: SegRequest) => T): T { - req.txTime = this.now(); - req.rto = this.rtte.rto; - req.rtoExpiry = setTimeout(() => this.rtoTimeout(req.segNum), req.rto); - req.interest = makeInterest(req); - return req.interest; - } - - /** - * Notify a request has been satisfied. - * @param now - Reading of `this.now()` at packet arrival (e.g. before verification). - */ - public satisfy(segNum: number, now: number, hasCongestionMark: boolean) { - const req = this.pending.get(segNum); - if (!req) { - return; - } - this.pending.delete(segNum); - if (!this.retxQueue.delete(segNum)) { - clearTimeout(req.rtoExpiry); - this.tl.put(); - } - - if (!req.isRetx) { - const rtt = now - req.txTime; - this.rtte.push(rtt, this.tl.nTaken + 1); - } - - if (hasCongestionMark) { - this.decrease(now, false); - } else { - this.ca.increase(now, this.rtte.sRtt); - } - - this.hiDataSegNum = Math.max(this.hiDataSegNum, segNum); - if (this.hiDataSegNum === this.estimatedFinalSegNum && this.estimatedFinalSegNum < this.finalSegNum) { - ++this.estimatedFinalSegNum; - } - this.dispatchTypedEvent("unblock", new Event("unblock")); - } - - private rtoTimeout(segNum: number) { - const req = this.pending.get(segNum)!; - assert(!!req); - this.tl.put(); - - if (segNum > this.finalSegNum) { - return; - } - - this.decrease(this.now(), true); - - if (req.nRetx >= this.retxLimit) { - this.pending.delete(segNum); - this.dispatchTypedEvent("exceedRetxLimit", new CustomEvent("exceedRetxLimit", { detail: segNum })); - } else { - this.retxQueue.add(segNum); - } - this.dispatchTypedEvent("unblock", new Event("unblock")); - } - - /** Decrease congestion window at most once per RTT. */ - private decrease(now: number, backoff: boolean) { - if (this.hiDataSegNum <= this.cwndDecreaseSegNum) { - // TODO permit one RTO backoff after each RTO duration - return; - } - this.ca.decrease(now); - if (backoff) { - this.rtte.backoff(); - } - this.cwndDecreaseSegNum = this.hiInterestSegNum; - } - - /** - * Update final segment number (inclusive) when it becomes known. - * - * @remarks - * Increasing this above `.opts.segmentRange[1]` or a previous value has no effect. - */ - public setFinalSegNum(finalSegNum: number, estimated = false): void { - if (finalSegNum >= this.finalSegNum) { - return; - } - - this.estimatedFinalSegNum = finalSegNum; - if (!estimated) { - this.finalSegNum = finalSegNum; - this.processCancels = true; - } - this.dispatchTypedEvent("unblock", new Event("unblock")); - } - - /** Notify that the incoming stream has ended. */ - public end(): boolean { - return this.pending.size === 0; - } -} - -export namespace FetchLogic { - export interface Options { - /** Use given RttEstimator instance or construct RttEstimator from options. */ - rtte?: RttEstimator | RttEstimator.Options; - - /** Use given congestion avoidance instance. */ - ca?: CongestionAvoidance; - - /** - * Specify segment number range as `[begin, end)`. - * - * @remarks - * The begin segment number is inclusive and the end segment number is exclusive. - * If the begin segment number is greater than the final segment number, fetching will fail. - * If the end segment number is undefined or greater than the final segment number, - * fetching will stop at the final segment. - */ - segmentRange?: [number, number | undefined]; - - /** - * Estimated final segment number (inclusive). - * - * @remarks - * If specified, FetchLogic sends Interests up to this segment number initially as permitted - * by congestion control, then sends further Interests in a stop-and-wait manner, unless a new - * estimation or known finalSegNum is provided via setFinalSegNum() function. - */ - estimatedFinalSegNum?: number; - - /** - * Maximum number of retransmissions, excluding initial Interest. - * @defaultValue 15 - */ - retxLimit?: number; - } -} diff --git a/pkg/segmented-object/src/fetch/token-limiter.ts b/pkg/segmented-object/src/fetch/token-limiter.ts deleted file mode 100644 index f39fafc7..00000000 --- a/pkg/segmented-object/src/fetch/token-limiter.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { assert } from "@ndn/util"; - -/** A token-based throttle limiter. */ -export class TokenLimiter { - private readonly queue = new Set<() => void>(); - private nTaken_ = 0; - - constructor(private capacity_ = 0) {} - - /** How many callers are waiting for a token. */ - public get nWaiting() { return this.queue.size; } - - /** How many tokens are currently taken. */ - public get nTaken() { return this.nTaken_; } - - /** Capacity of the token bucket. */ - public get capacity() { return this.capacity_; } - - /** Change total number of tokens. */ - public set capacity(v) { - assert(v >= 0); - this.capacity_ = Math.trunc(v); - this.unblock(); - } - - /** Wait to take a token. */ - public take(): Promise { - return new Promise((resolve) => { - this.queue.add(resolve); - this.unblock(); - }); - } - - /** Return one or more tokens. */ - public put(n = 1) { - this.nTaken_ -= n; - this.unblock(); - } - - private unblock() { - for (const fulfill of this.queue) { - if (this.nTaken_ >= this.capacity_) { - break; - } - ++this.nTaken_; - this.queue.delete(fulfill); - fulfill(); - } - } -} diff --git a/pkg/segmented-object/src/fetch/unverified.ts b/pkg/segmented-object/src/fetch/unverified.ts new file mode 100644 index 00000000..4cfff1d0 --- /dev/null +++ b/pkg/segmented-object/src/fetch/unverified.ts @@ -0,0 +1,332 @@ +import { CancelInterest, Forwarder, type FwFace, type FwPacket } from "@ndn/fw"; +import { Data, Interest, type Name } from "@ndn/packet"; +import { pushable } from "@ndn/util"; +import hirestime from "hirestime"; +import itKeepAlive from "it-keepalive"; +import take from "obliterator/take.js"; + +import { defaultSegmentConvention, type SegmentConvention } from "../convention"; +import type { CongestionAvoidance } from "./congestion-avoidance"; +import { RttEstimator } from "./rtt-estimator"; +import { TcpCubic } from "./tcp-cubic"; + +/** Precise clock for fetching algorithms. */ +const getNow = hirestime(); + +export interface UnverifiedFetcherOptions { + /** + * Use the specified logical forwarder. + * @defaultValue `Forwarder.getDefault()` + */ + fw?: Forwarder; + + /** + * FwFace description. + * @defaultValue "fetch" + name + */ + describe?: string; + + /** AbortSignal that allows canceling the fetch via AbortController. */ + signal?: AbortSignal; + + /** + * Specify segment number range as `[begin, end)`. + * + * @remarks + * The begin segment number is inclusive and the end segment number is exclusive. + * If the begin segment number is greater than the final segment number, fetching will fail. + * If the end segment number is undefined or greater than the final segment number, + * fetching will stop at the final segment. + */ + segmentRange?: [number, number | undefined]; + + /** + * Estimated final segment number (inclusive). + * + * @remarks + * This option has no effect at the moment. + * @alpha + */ + estimatedFinalSegNum?: number; + + /** + * Choose a segment number naming convention. + * @defaultValue `Segment3` + */ + segmentNumConvention?: SegmentConvention; + + /** + * Modify Interest according to specified options. + * + * @remarks + * This can also be used to witness Interests without modification. + */ + modifyInterest?: Interest.Modify; + + /** + * InterestLifetime added to RTO. + * @defaultValue 1000ms + */ + lifetimeAfterRto?: number; + + /** Use given RttEstimator instance or construct RttEstimator from options. */ + rtte?: RttEstimator | RttEstimator.Options; + + /** Use given congestion avoidance instance. */ + ca?: CongestionAvoidance; + + /** + * Maximum number of retransmissions, excluding initial Interest. + * @defaultValue 15 + */ + retxLimit?: number; + + /** + * List of acceptable ContentType values. + * @defaultValue `[0]` + */ + acceptContentType?: readonly number[]; +} + +/** Segmented object fetcher without verification. */ +export class UnverifiedFetcher { + private readonly signal?: AbortSignal; + /** Next segment number to start fetching. */ + private segNext: number; + /** Last segment number (inclusive). */ + private segLast: number; + private readonly segmentNumConvention: SegmentConvention; + private readonly modifyInterest: Interest.ModifyFunc; + private readonly lifetimeAfterRto: number; + private readonly rtte: RttEstimator; + private readonly ca: CongestionAvoidance; + private readonly retxLimit: number; + private readonly acceptContentType: readonly number[]; + + private readonly face: FwFace; + + private count_ = 0; + private nextCwndDecrease = 0; + /** Segments for which at least one Interest is sent but the Data has not arrived. */ + private readonly pendings = new Map(); + /** Segments whose RTO is exceeded and shall be retransmitted. */ + private readonly retxQ = new Set(); + /** Interests being sent to logical forwarder. */ + private readonly txQ = pushable(); + /** Data being received from logical forwarder. */ + private readonly rxQ = pushable(); + + constructor( + private readonly name: Name, + { + fw = Forwarder.getDefault(), + describe = `fetch(${name})`, + signal, + segmentRange: [segFirst, segLast1 = Number.MAX_SAFE_INTEGER] = [0, undefined], + segmentNumConvention = defaultSegmentConvention, + modifyInterest, + lifetimeAfterRto = 1000, + rtte, + ca = new TcpCubic(), + retxLimit = 15, + acceptContentType = [0], + }: UnverifiedFetcherOptions, + ) { + this.signal = signal; + this.segNext = segFirst; + this.segLast = segLast1 - 1; + this.segmentNumConvention = segmentNumConvention; + this.modifyInterest = Interest.makeModifyFunc(modifyInterest); + this.lifetimeAfterRto = lifetimeAfterRto; + this.rtte = rtte instanceof RttEstimator ? rtte : new RttEstimator(rtte); + this.ca = ca; + this.retxLimit = retxLimit; + this.acceptContentType = acceptContentType; + + this.face = fw.addFace({ + rx: this.txQ, + tx: async (iterable) => { + for await (const data of iterable) { + this.rxQ.push(data); + } + this.rxQ.stop(); + }, + }, { describe }); + } + + /** Number of segments retrieved so far. */ + public get count() { return this.count_; } + + /** + * Retrieve segments without verification. + * @returns Stream of segments. + */ + public async *fetch(): AsyncIterable { + try { + yield* this.unsafeFetch(); + } finally { + this.txQ.stop(); + this.face.close(); + } + } + + private async *unsafeFetch(): AsyncIterable { + for await (const pkt of itKeepAlive(() => false, { timeout: 4 })(this.rxQ)) { + if (this.signal?.aborted) { + throw new Error("fetch aborted"); + } + + if (pkt) { + const { l3, token, congestionMark = 0 } = pkt; + if (l3 instanceof Data && typeof token === "number" && this.acceptContentType.includes(l3.contentType)) { + yield* this.handleData(l3, token, congestionMark); + } + } + + this.processRtoExpiry(); + if (this.processTx()) { + return; + } + } + if (this.pendings.size > 0) { + throw new Error("fetch incomplete"); + } + } + + /** + * Handle Data arrival. + * @param data - Data packet. + * @param seg - Segment number. + * @param congestionMark - Congestion mark on Data packet; 0 if none. + * @returns - Successfully retrieved segment, if any. + */ + private *handleData(data: Data, seg: number, congestionMark: number): Iterable { + const fs = this.pendings.get(seg); + if (!fs) { + return; + } + + const now = getNow(); + const rtt = now - fs.txTime; + if (fs.nRetx === 0) { + this.rtte.push(rtt, this.pendings.size); + } + if (congestionMark) { + this.decreaseCwnd(now); + } else { + this.ca.increase(now, rtt); + } + + if (data.isFinalBlock) { + this.segLast = seg; + } + ++this.count_; + yield { data, seg }; + + this.retxQ.delete(seg); + this.pendings.delete(seg); + } + + /** Process RTO expirations on pending segments. */ + private processRtoExpiry(): void { + const now = getNow(); + for (const [seg, fs] of this.pendings) { + if (seg > this.segLast) { + this.pendings.delete(seg); + if (!this.retxQ.delete(seg)) { + this.txQ.push(new CancelInterest(fs.interest!, seg)); + } + continue; + } + + if (!this.retxQ.has(seg) && fs.rtoExpiry < now) { + if (fs.nRetx >= this.retxLimit) { + throw new Error(`exceed retx limit on segment ${seg}`); + } + if (this.decreaseCwnd(fs.rtoExpiry)) { + this.rtte.backoff(); + } + this.retxQ.add(seg); + } + } + } + + /** + * Transmit Interests as needed. + * @returns `true` if fetching is fully completed. + */ + private processTx(): boolean { + switch (true) { + case this.pendings.size - this.retxQ.size >= this.ca.cwnd: { + // congestion window full + break; + } + case this.retxQ.size > 0: { + const [seg] = take(this.retxQ, 1) as [number]; + this.retxQ.delete(seg); + const fs = this.pendings.get(seg)!; + ++fs.nRetx; + this.sendInterest(fs); + break; + } + case this.segNext <= this.segLast: { + const seg = this.segNext++; + const fs = new SegState(seg); + this.pendings.set(seg, fs); + this.sendInterest(fs); + break; + } + case this.pendings.size === 0: { + return true; + } + } + return false; + } + + /** Send an Interest and record TX time. */ + private sendInterest(fs: SegState): void { + const rto = this.rtte.rto; + fs.txTime = getNow(); + fs.rtoExpiry = fs.txTime + rto; + + fs.interest = new Interest(); + fs.interest.name = this.name.append(this.segmentNumConvention, fs.seg); + fs.interest.lifetime = rto + this.lifetimeAfterRto; + this.modifyInterest(fs.interest); + this.txQ.push({ l3: fs.interest, token: fs.seg }); + } + + /** + * Decrease congestion window if allowed. + * @param effAt - Effective time for the decreasing. + * @returns Whether decreasing was allowed to happen. + */ + private decreaseCwnd(effAt: number): boolean { + if (effAt < this.nextCwndDecrease) { + // react to one congestion event per RTO + return false; + } + this.nextCwndDecrease = effAt + this.rtte.rto; + this.ca.decrease(effAt); + return true; + } +} + +export interface SegData { + seg: number; + data: Data; +} + +/** Per-segment state. */ +class SegState { + constructor(public readonly seg: number) {} + + /** Last Interest TX time. */ + public txTime = 0; + /** RTO expiration time for the last Interest. */ + public rtoExpiry = 0; + /** Last Interest packet. */ + public interest?: Interest; + /** Number of retransmissions. 0 means initial Interest. */ + public nRetx = 0; +} diff --git a/pkg/segmented-object/tests/serve-fetch.t.ts b/pkg/segmented-object/tests/serve-fetch.t.ts index c9c3cb71..b754128c 100644 --- a/pkg/segmented-object/tests/serve-fetch.t.ts +++ b/pkg/segmented-object/tests/serve-fetch.t.ts @@ -3,11 +3,11 @@ import "@ndn/util/test-fixture/expect"; // eslint-disable-next-line n/no-unsupported-features/node-builtins import { Blob } from "node:buffer"; -import { consume, produce, type ProducerHandler } from "@ndn/endpoint"; +import { consume } from "@ndn/endpoint"; import { Forwarder } from "@ndn/fw"; import { Bridge } from "@ndn/l3face"; import { Segment2, Segment3 } from "@ndn/naming-convention2"; -import { Data, FwHint, Name, type Verifier } from "@ndn/packet"; +import { FwHint, Name, type Verifier } from "@ndn/packet"; import { Closers, delay } from "@ndn/util"; import { makeTmpDir } from "@ndn/util/test-fixture/tmp"; import { BufferReadableMock, BufferWritableMock } from "stream-mock"; @@ -40,10 +40,14 @@ test("buffer to buffer", async () => { const server = serve("/R", chunkSource); closers.push(server); + const nFaces = Forwarder.getDefault().faces.size; + const fetched = fetch("/R"); expect(fetched.count).toBe(0); await expect(fetched).resolves.toEqualUint8Array(objectBody); expect(fetched.count).toBeGreaterThan(0); + + expect(Forwarder.getDefault().faces.size).toBe(nFaces); }); test("blob to chunks", async () => { @@ -160,13 +164,9 @@ test.each<(fw: Forwarder, fwHint: FwHint) => fetch.Options>([ }); describe("empty object", () => { - const handler1 = vi.fn, ReturnType>( - async (interest) => new Data(interest.name, Data.ContentType(3))); beforeEach(() => { - handler1.mockReset(); const server = serve("/R", new BufferChunkSource(new Uint8Array())); - const producer1 = produce(server.prefix.append(Segment3, 1), handler1); - closers.push(server, producer1); + closers.push(server); }); test("consume single", async () => { @@ -180,18 +180,21 @@ describe("empty object", () => { const fetched = fetch("/R"); await expect(fetched).resolves.toHaveLength(0); expect(fetched.count).toBe(1); - expect(handler1).toHaveBeenCalled(); }); test.each<(verifier: Verifier) => fetch.Options>([ (verifier) => ({ verifier }), (verifier) => ({ cOpts: { verifier } }), ])("verify error %#", async (makeOpts) => { + const nFaces = Forwarder.getDefault().faces.size; + const verify = vi.fn, ReturnType>() .mockRejectedValue(new Error("mock-verify-error")); await expect(fetch("/R", { retxLimit: 0, ...makeOpts({ verify }) })) .rejects.toThrow(/mock-verify-error/); expect(verify).toHaveBeenCalledTimes(1); + + expect(Forwarder.getDefault().faces.size).toBe(nFaces); }); }); @@ -199,7 +202,11 @@ test("segment number convention mismatch", async () => { const server = serve("/R", new BufferChunkSource(objectBody), { segmentNumConvention: Segment2 }); closers.push(server); + const nFaces = Forwarder.getDefault().faces.size; + await expect(fetch("/R", { retxLimit: 1 })).rejects.toThrow(); + + expect(Forwarder.getDefault().faces.size).toBe(nFaces); }); test("abort", async () => {