Skip to content

Commit

Permalink
node-transport,etc: make constructors private
Browse files Browse the repository at this point in the history
Constructors of TCP/UDP/Unix/H3/WebSocket transports are now private.
connect() function is moved into the class as a static method.

Transport.tx is changed from a field to a method.

Transport.Rx, Transport.Tx, udp_helper.Socket type aliases are deleted.
  • Loading branch information
yoursunny committed Feb 1, 2024
1 parent 407d85a commit dd06815
Show file tree
Hide file tree
Showing 24 changed files with 297 additions and 350 deletions.
2 changes: 1 addition & 1 deletion packages/dpdkmgmt/src/face.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const openFaceScheme = {
throw new Error(`unexpected locator: ${JSON.stringify(loc)}`);
}
await udp_helper.connect(sock, { host, port });
return [new UdpTransport(sock), mtu];
return [await UdpTransport.connect(sock), mtu];
});
} catch (err: unknown) {
sock.close();
Expand Down
59 changes: 32 additions & 27 deletions packages/dpdkmgmt/src/memif-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@ import type { Memif } from "memif";

/** Shared Memory Packet Interface (memif) transport. */
export class MemifTransport extends Transport {
public override readonly rx: Transport.Rx;
public override readonly tx: Transport.Tx;
/** Create a transport and establish connection. */
public static async connect(opts: MemifTransport.Options): Promise<MemifTransport> {
const {
waitUp = true,
} = opts;

/**
* Access the underlying Memif instance.
* You may read counters and monitor "memif:up" "memif:down" events, but not send/receive packets.
*/
public readonly memif: Memif;
private readonly mtu_: number;
const MemifConstructor: typeof Memif = (await import("memif")).Memif;
const transport = new MemifTransport(opts, new MemifConstructor(opts));

if (waitUp) {
await once(transport.memif, "memif:up");
}
return transport;
}

constructor(opts: MemifTransport.Options, memif: Memif) {
private constructor(opts: MemifTransport.Options, memif: Memif) {
super({
describe: `Memif(${opts.socketName}:${opts.id ?? 0})`,
local: true,
Expand All @@ -25,36 +30,36 @@ export class MemifTransport extends Transport {
this.memif = memif;
this.mtu_ = Math.min(memif.dataroom, opts.dataroom ?? Infinity);
this.rx = rxFromPacketIterable(this.memif);
this.tx = txToStream(this.memif);
}

/**
* Access the underlying Memif instance.
*
* @remarks
* You may read counters and monitor "memif:up" "memif:down" events, but not send/receive packets.
*/
public readonly memif: Memif;
private readonly mtu_: number;

public override get mtu() { return this.mtu_; }

public override readonly rx: Transport.RxIterable;

public override async tx(iterable: Transport.TxIterable) {
return txToStream(this.memif, iterable);
}
}

export namespace MemifTransport {
/** {@link MemifTransport.connect} options. */
export interface Options extends Memif.Options {
/**
* Whether to wait until the connection is up.
* Default is true;
* @defaultValue true
*/
waitUp?: boolean;
}

/** Create a memif transport. */
export async function connect(opts: Options): Promise<MemifTransport> {
const {
waitUp = true,
} = opts;

const MemifConstructor: typeof Memif = (await import("memif")).Memif;
const transport = new MemifTransport(opts, new MemifConstructor(opts));

if (waitUp) {
await once(transport.memif, "memif:up");
}
return transport;
}

/** Create a memif transport and add to forwarder. */
export const createFace = L3Face.makeCreateFace(connect);
export const createFace = L3Face.makeCreateFace(MemifTransport.connect);
}
6 changes: 3 additions & 3 deletions packages/l3face/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { L3Face } from "./l3face";
import { Transport } from "./transport";

class BridgeTransport extends Transport {
public override readonly rx: Transport.Rx;
public override readonly rx: Transport.RxIterable;
public bridgePeer?: BridgeTransport;
private readonly bridgeRx = pushable<Uint8Array>({ objectMode: true });

Expand All @@ -18,7 +18,7 @@ class BridgeTransport extends Transport {
this.rx = map((wire) => new Decoder(wire).read(), relay(this.bridgeRx));
}

public override readonly tx = async (iterable: AsyncIterable<Uint8Array>) => {
public override async tx(iterable: Transport.TxIterable) {
assert(this.bridgePeer, "bridgePeer must be set");
const iterator = iterable[Symbol.asyncIterator]();
while (true) {
Expand All @@ -32,7 +32,7 @@ class BridgeTransport extends Transport {
const copy = result.value.slice();
this.bridgePeer.bridgeRx.push(copy);
}
};
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/l3face/src/l3face.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class L3Face extends TypedEventTarget<EventMap> implements FwFace.RxTx {
}
}

private async *rxTransform(transportRx: Transport.Rx): AsyncIterable<FwPacket> {
private async *rxTransform(transportRx: Transport.RxIterable): AsyncIterable<FwPacket> {
yield* pipeline(
() => transportRx,
this.lp.rx,
Expand All @@ -141,7 +141,7 @@ export class L3Face extends TypedEventTarget<EventMap> implements FwFace.RxTx {
);
}

private txTransform(fwTx: AsyncIterable<FwPacket>): AsyncIterable<Uint8Array> {
private txTransform(fwTx: AsyncIterable<FwPacket>): Transport.TxIterable {
return pipeline(
() => fwTx,
filter((pkt: FwPacket) => FwPacket.isEncodable(pkt)),
Expand Down
4 changes: 2 additions & 2 deletions packages/l3face/src/rxtx-iterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import type { Transport } from "./transport";
/**
* Decode TLVs from datagrams.
* @param iterable - RX datagram stream, such as a UDP socket.
* @returns AsyncIterable of TLVs.
* @returns RX packet stream.
*/
export async function* rxFromPacketIterable(iterable: AsyncIterable<Uint8Array>): Transport.Rx {
export async function* rxFromPacketIterable(iterable: AsyncIterable<Uint8Array>): Transport.RxIterable {
for await (const pkt of safeIter(iterable)) {
const decoder = new Decoder(pkt);
let tlv: Decoder.Tlv;
Expand Down
30 changes: 14 additions & 16 deletions packages/l3face/src/rxtx-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import type { Transport } from "./transport";
/**
* Extract TLVs from continuous byte stream.
* @param conn - RX byte stream, such as a TCP socket.
* @returns AsyncIterable of TLVs.
* @returns RX packet stream.
*/
export async function* rxFromStream(conn: NodeJS.ReadableStream): Transport.Rx {
export async function* rxFromStream(conn: NodeJS.ReadableStream): Transport.RxIterable {
let leftover = new Uint8Array();
for await (const chunk of safeIter(conn as AsyncIterable<Buffer>)) {
if (leftover.length > 0) {
Expand Down Expand Up @@ -43,25 +43,23 @@ export async function* rxFromStream(conn: NodeJS.ReadableStream): Transport.Rx {
/**
* Pipe encoded packets to output stream.
* @param conn - TX output stream, such as a TCP socket.
* @returns Function that accepts AsyncIterable of encoded TLVs.
* @param iterable - TX packet stream.
*
* @remarks
* `conn` will be closed/destroyed upon reaching the end of packet stream.
*/
export function txToStream(conn: NodeJS.WritableStream): Transport.Tx {
return async (iterable: AsyncIterable<Uint8Array>) => {
export async function txToStream(conn: NodeJS.WritableStream, iterable: Transport.TxIterable): Promise<void> {
try {
await writeToStream(conn, iterable);
} finally {
try {
await writeToStream(conn, iterable);
} finally {
try {
conn.end();
await pEvent(conn, "finish", { timeout: 100 });
} catch {}
conn.end();
await pEvent(conn, "finish", { timeout: 100 });
} catch {}

const socket = conn as Socket;
if (typeof socket.destroy === "function") {
socket.destroy();
}
const socket = conn as Socket;
if (typeof socket.destroy === "function") {
socket.destroy();
}
};
}
}
14 changes: 8 additions & 6 deletions packages/l3face/src/stream-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ import { rxFromStream, txToStream } from "./rxtx-stream";
import { Transport } from "./transport";

/** Node.js stream-based transport. */
export class StreamTransport extends Transport {
public override readonly rx: Transport.Rx;
public override readonly tx: Transport.Tx;

constructor(conn: NodeJS.ReadWriteStream, attrs: Record<string, unknown> = {}) {
export class StreamTransport<T extends NodeJS.ReadWriteStream = NodeJS.ReadWriteStream> extends Transport {
constructor(protected readonly conn: T, attrs: Record<string, unknown> = {}) {
super(attrs);
this.rx = rxFromStream(conn);
this.tx = txToStream(conn);
}

/** Report MTU as Infinity. */
public override get mtu() { return Infinity; }

public override readonly rx: Transport.RxIterable;

public override tx(iterable: Transport.TxIterable) {
return txToStream(this.conn, iterable);
}
}
27 changes: 15 additions & 12 deletions packages/l3face/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ const DEFAULT_MTU = 1200;
* The transport understands NDN TLV structures, but does not otherwise concern with packet format.
*/
export abstract class Transport {
/** Iterable of outgoing packets. */
public abstract readonly rx: Transport.Rx;
/** Function to accept iterable of incoming packets. */
public abstract readonly tx: Transport.Tx;

/**
* Constructor.
* @param attributes - Attributes of the transport.
Expand All @@ -31,6 +26,17 @@ export abstract class Transport {
*/
public get mtu() { return DEFAULT_MTU; }

/** Iterable of incoming packets received through the transport. */
public abstract readonly rx: Transport.RxIterable;

/**
* Function to accept outgoing packet stream.
* @param iterable - Iterable of outgoing packets sent through the transport.
* Size of each packet cannot exceed `.mtu`.
* @returns Promise that resolves when iterable is exhausted or rejects upon error.
*/
public abstract tx(iterable: Transport.TxIterable): Promise<void>;

/* eslint-disable tsdoc/syntax -- tsdoc-missing-reference */
/**
* Reopen the transport after it has failed.
Expand Down Expand Up @@ -75,14 +81,11 @@ export namespace Transport {
[k: string]: unknown;
}

/** RX iterable for incoming packets. */
export type Rx = AsyncIterable<Decoder.Tlv>;
/** RX packet stream. */
export type RxIterable = AsyncIterable<Decoder.Tlv>;

/**
* TX function for outgoing packets.
* @returns Promise that resolves when iterable is exhausted, and rejects upon error.
*/
export type Tx = (iterable: AsyncIterable<Uint8Array>) => Promise<void>;
/** TX packet stream. */
export type TxIterable = AsyncIterable<Uint8Array>;

/**
* Error thrown by {@link Transport.reopen} to indicate that reopen operation is not supported.
Expand Down
2 changes: 1 addition & 1 deletion packages/lp/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class LpService {
}
}

public tx = (iterable: AsyncIterable<LpService.Packet>): AsyncIterable<Uint8Array | LpService.TxError> => flatMapOnce(
public readonly tx = (iterable: AsyncIterable<LpService.Packet>): AsyncIterable<Uint8Array | LpService.TxError> => flatMapOnce(
(pkt) => this.encode(pkt),
this.keepAlive ?
itKeepAlive<LpService.Packet | false>(() => false, { timeout: this.keepAlive })(iterable) :
Expand Down

0 comments on commit dd06815

Please sign in to comment.