Skip to content

Commit

Permalink
svs: speed up SVS-PS test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Feb 10, 2024
1 parent 9b234f4 commit dc89dc2
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 32 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"@yoursunny/xo-config": "0.57.1",
"codedown": "^3.1.0",
"tslib": "^2.6.2",
"typedoc": "^0.25.7",
"typedoc": "^0.25.8",
"typescript": "~5.3.3",
"vitest": "^1.2.2"
},
Expand Down
1 change: 1 addition & 0 deletions pkg/svs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"@ndn/sync-api": "workspace:*",
"@ndn/tlv": "workspace:*",
"@ndn/util": "workspace:*",
"p-defer": "^4.0.0",
"streaming-iterables": "^8.0.1",
"tslib": "^2.6.2",
"type-fest": "^4.10.2",
Expand Down
37 changes: 23 additions & 14 deletions pkg/svs/src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Endpoint, type Producer, type ProducerHandler } from "@ndn/endpoint";
import { Interest, Name, type NameLike, nullSigner, type Signer, type Verifier } from "@ndn/packet";
import { type SyncNode, type SyncProtocol, SyncUpdate } from "@ndn/sync-api";
import { CustomEvent, randomJitter, trackEventListener } from "@ndn/util";
import { type Promisable } from "type-fest";
import type { Promisable } from "type-fest";
import { TypedEventTarget } from "typescript-event-target";

import { StateVector } from "./state-vector";
Expand Down Expand Up @@ -112,6 +112,14 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
return new StateVector(this.own);
}

/**
* Multi-purpose callback passed to {@link SvSyncNode} constructor.
*
* @remarks
* - `nodeOp(id)`: get seqNum
* - `nodeOp(id, n)`: set seqNum, return new seqNum
* - `nodeOp(id, 0)`: delete node during initialization
*/
private readonly nodeOp = (id: Name, n: number | undefined): number => {
if (n !== undefined) { // setSeqNum requested
if (!this.producer) { // decrement/remove permitted during initialization
Expand Down Expand Up @@ -167,18 +175,18 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
ourNewer: ourNewer.length,
});
if (ourNewer.length > 0) {
this.sendSyncInterest();
void this.sendSyncInterest();
}
this.aggregated = undefined;
} else { // in steady state
this.debug("timer");
this.sendSyncInterest();
void this.sendSyncInterest();
}

this.resetTimer();
};

private sendSyncInterest(): void {
private async sendSyncInterest(): Promise<void> {
this.debug("send");

const interest = new Interest();
Expand All @@ -187,14 +195,15 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
interest.mustBeFresh = true;
interest.lifetime = this.syncInterestLifetime;

void (async () => {
await this.signer.sign(interest);
try {
await this.endpoint.consume(interest, {
describe: `${this.describe}[c]`,
});
} catch {}
})();
await this.signer.sign(interest);
try {
await this.endpoint.consume(interest, {
describe: `${this.describe}[c]`,
retx: 0,
});
} catch {
// not expecting a reply, so that a timeout will happen and it shall be ignored
}
}
}

Expand All @@ -208,6 +217,7 @@ export namespace SvSync {
*/
export type Timer = [ms: number, jitter: number];

/** {@link SvSync.create} options. */
export interface Options {
/**
* Endpoint for communication.
Expand All @@ -232,8 +242,7 @@ export namespace SvSync {
* During initialization, it's possible to remove SyncNode or decrease seqNum.
* Calling `sync.close()` has no effect.
*
* Sync protocol starts running after the function has returned and the returned Promise
* is resolved.
* Sync protocol starts running after the returned Promise is resolved.
*/
initialize?: (sync: SvSync) => Promisable<void>;

Expand Down
33 changes: 18 additions & 15 deletions pkg/svs/tests/svsps.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Name, type NameLike } from "@ndn/packet";
import { DataStore } from "@ndn/repo";
import { Closers, console, crypto, delay } from "@ndn/util";
import memdown from "memdown";
import pDefer from "p-defer";
import { afterEach, beforeAll, expect, test, vi } from "vitest";

import { type MappingEntry, type Subscription, SvPublisher, SvSubscriber, SvSync, TimedMappingEntry } from "..";
Expand Down Expand Up @@ -54,7 +55,6 @@ function enableDebug(subs: Record<string, SvSubscriber<any>>): void {
}

type Sub = Subscription<Name, SvSubscriber.Update>;
type UpdateHandler = (update: SvSubscriber.Update) => void;

async function publishCheck(
publisher: SvPublisher,
Expand All @@ -66,28 +66,32 @@ async function publishCheck(
) {
const abort = new AbortController();
const received = Array.from<SvSubscriber.Update | undefined>({ length: expectReceive.length });
let nWaiting = expectReceive.length;
const allReceived = pDefer<void>();
for (const [i, sub] of expectReceive.entries()) {
let isReceived = false;
const handleUpdate: UpdateHandler = (update) => {
expect(isReceived).toBeFalsy();
isReceived = true;
sub.addEventListener("update", (update) => { // eslint-disable-line @typescript-eslint/no-loop-func
expect(received[i]).toBeUndefined();
received[i] = update;
};
sub.addEventListener("update", handleUpdate, { signal: abort.signal });
if (--nWaiting === 0) {
setTimeout(() => allReceived.resolve(), 200);
}
}, { signal: abort.signal });
}
for (const sub of expectNotReceive) {
const handleUpdate: UpdateHandler = ({ publisher, seqNum, name }) => {
sub.addEventListener("update", ({ publisher, seqNum, name }) => {
expect.fail(`unexpected update ${publisher}:${seqNum} ${name}`);
};
sub.addEventListener("update", handleUpdate, { signal: abort.signal });
}, { signal: abort.signal });
}

const payload = crypto.getRandomValues(new Uint8Array(payloadLength));
const seqNum = await publisher.publish(name, payload, entry);
await delay(1000);
await Promise.race([
delay(1000),
allReceived.promise,
]);

for (const update of received) {
expect(update).toBeTruthy();
expect(update).toBeDefined();
expect(update!.publisher).toEqualName(publisher.id);
expect(update!.seqNum).toBe(seqNum);
expect(update!.name).toEqualName(name);
Expand All @@ -107,8 +111,7 @@ test("simple", async () => {

const repoA = new DataStore(memdown());
const repoB = new DataStore(memdown());
const repoC = new DataStore(memdown());
closers.push(repoA, repoB, repoC);
closers.push(repoA, repoB);

const pubA0 = new SvPublisher({ ...pubOpts, sync: syncA, id: new Name("/0"), store: repoA });
const pubA1 = new SvPublisher({ ...pubOpts, sync: syncA, id: new Name("/1"), store: repoA });
Expand Down Expand Up @@ -138,7 +141,7 @@ test("simple", async () => {

// bad inner signature
await publishCheck(pubB7, "/t/7", 100, undefined, [], [subC1, subC9, subDt, subD0]);
// bad outer signaturet
// bad outer signature
await publishCheck(pubB8, "/t/8", 100, undefined, [], [subC1, subC9, subDt, subD0]);
// bad mapping signature, but subC would not retrieve mapping
await publishCheck(pubB9, "/t/9", 100, undefined, [subC9], [subC1, subDt, subD0]);
Expand Down
8 changes: 6 additions & 2 deletions pkg/sync-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@

This package is part of [NDNts](https://yoursunny.com/p/NDNts/), Named Data Networking libraries for the modern web.

This package defines the programming interface of dataset synchronization and publish-subscribe protocols, and functions that work with this interface.
It also includes functions common to multiple sync protocol implementations.
This package defines the abstract programming interfaces of dataset synchronization and publish-subscribe protocols.
Each sync/pubsub protocol package shall re-export relevant interfaces in type-only mode.
Users do not need to import from this package directly.

This package also includes functions common to multiple sync/pubsub protocol implementations.
These are implementation details of sync/pubsub protocols.

0 comments on commit dc89dc2

Please sign in to comment.