Skip to content

Commit

Permalink
docs(csp): add docs for all Channel ops
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Apr 28, 2024
1 parent 6bb0107 commit 84c36d7
Showing 1 changed file with 119 additions and 0 deletions.
119 changes: 119 additions & 0 deletions packages/csp/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ export interface ChannelOpts {
id: string;
}

/**
* Syntax sugar for creating a new CSP {@link Channel}. By default, the channel
* has a buffer capacity of 1 value, but supports custom buffer sizes and/or
* implementations (described in readme).
*
* @param opts
*/
export function channel<T>(opts?: Partial<ChannelOpts>): Channel<T>;
export function channel<T>(
buf: ChannelBuffer<T> | number,
Expand All @@ -34,6 +41,11 @@ export class Channel<T> implements IChannel<T> {
races: Fn<Channel<T>, void>[] = [];
protected state = STATE_OPEN;

/**
* See {@link channel} for reference.
*
* @param opts
*/
constructor(opts?: Partial<ChannelOpts>);
constructor(buf: ChannelBuffer<T> | number, opts?: Partial<ChannelOpts>);
constructor(...args: any[]) {
Expand All @@ -52,13 +64,58 @@ export class Channel<T> implements IChannel<T> {
this.id = opts?.id ?? `chan-${__nextID()}`;
}

/**
* Returns an async iterator of this channel, acting as adapter between the
* CSP world and the more generic ES async iterables. The iterator stops
* once the channel has been closed and no further values can be read.
*
* @remarks
* Multiple iterators will compete for new values. To ensure an iterator
* receives all of the channel's values, you must either ensure there's only
* a single iterator per channel or feed the channel into a {@link mult}
* first and create an iterator of a channel obtained via
* {@link Mult.subscribe}.
*
* @example
* ```ts tangle:../export/channel-iterator.ts
* import { channel } from "@thi.ng/csp";
*
* const chan = channel<number>();
*
* (async () => {
* // implicit iterator conversion of the channel
* for await(let x of chan) console.log("received", x);
* console.log("channel closed");
* })()
*
* chan.write(1);
* chan.write(2);
* chan.write(3);
* chan.close();
* ```
*/
async *[Symbol.asyncIterator](): AsyncIterableIterator<T> {
while (this.state < STATE_CLOSED) {
const x = await this.read();
if (x !== undefined) yield x;
}
}

/**
* Attempts to read a value from the channel. The returned promise will
* block until such value becomes available or if the channel has been
* closed in the meantime. In that latter case, the promise will resolve to
* `undefined`.
*
* @remarks
* If a value is already available at the time of the function call, the
* promise resolves immediately.
*
* Note: There's a limit of in-flight {@link MAX_READS} allowed per channel.
* The promise will reject if that number is exceeded.
*
* Also see {@link Channel.poll}.
*/
read() {
return new Promise<Maybe<T>>((resolve) => {
if (!this.readable()) {
Expand All @@ -78,6 +135,17 @@ export class Channel<T> implements IChannel<T> {
});
}

/**
* Similar to {@link Channel.read}, but not async and non-blocking. Will
* only succeed if the channel is readable (i.e. not yet closed) and if a
* value can be read immediately (without waiting). Returns the value or
* `undefined` if unsuccessful.
*
* @remarks
* Use {@link Channel.closed} to check if the channel is already closed.
*
* @param msg
*/
poll(): Maybe<T> {
const { reads, writes } = this;
if (this.readable() && !reads.length && writes.readable()) {
Expand All @@ -88,6 +156,25 @@ export class Channel<T> implements IChannel<T> {
}
}

/**
* Attempts to write a new value to the channel and returns a promise
* indicating success or failure. Depending on buffer capacity & behavior,
* the returned promise will block until the channel accept new values (i.e.
* until the next {@link Channel.read}) or if it has been closed in the
* meantime. In that latter case, the promise will resolve to false.
*
* @remarks
* If the channel's buffer accepts new writes or if a read op is already
* waiting at the time of the function call, the promise resolves
* immediately.
*
* If the buffer is already full, the write will be queued and only resolve
* when delivered. Note: As a fail-safe, there's a limit of queued
* {@link MAX_WRITES} allowed per channel. The promise will reject if that
* number is exceeded.
*
* Also see {@link Channel.offer}.
*/
write(msg: T) {
return new Promise<boolean>((resolve) => {
if (!this.writable()) {
Expand All @@ -111,6 +198,14 @@ export class Channel<T> implements IChannel<T> {
});
}

/**
* Similar to {@link Channel.write}, but not async and non-blocking. Will
* only succeed if the channel is writable (i.e. not yet closed/closing) and
* if a write is immediately possible (without queuing). Returns true, if
* successful.
*
* @param msg
*/
offer(msg: T) {
if (this.writable() && this.writes.writable()) {
this.write(msg);
Expand All @@ -119,6 +214,16 @@ export class Channel<T> implements IChannel<T> {
return false;
}

/**
* Queues a "race" operation & returns a promise which resolves with the
* channel itself when the channel becomes readable, but no other queued
* read operations are waiting (which always have priority). If the channel
* is already closed, the promise resolves immediately.
*
* @remarks
* This op is used internally by {@link select} to choose a channel to read
* from next.
*/
race() {
return new Promise<Channel<T>>((resolve) => {
if (!this.readable()) {
Expand All @@ -132,6 +237,15 @@ export class Channel<T> implements IChannel<T> {
});
}

/**
* Triggers closing of the channel (idempotent). Any queued writes remain
* readable, but new writes will be ignored.
*
* @remarks
* Whilst there're still values available for reading,
* {@link Channel.closed} will still return false since the channel state is
* still "closing", not yet fully "closed".
*/
close() {
if (this.state >= STATE_CLOSING) return;
const { reads, writes, races } = this;
Expand Down Expand Up @@ -168,6 +282,11 @@ export class Channel<T> implements IChannel<T> {
/**
* Returns true if the channel is fully closed and no further reads or
* writes are possible.
*
* @remarks
* Whilst there're still values available for reading, this will still
* return false since the channel state is still "closing", not yet fully
* "closed".
*/
closed() {
return this.state === STATE_CLOSED;
Expand Down

0 comments on commit 84c36d7

Please sign in to comment.