diff --git a/README.md b/README.md index aa9c679..c5aa9e8 100644 --- a/README.md +++ b/README.md @@ -61,12 +61,14 @@ But Uneventful is actually *better* than statecharts, even for design purposes: ```ts import { each } from "uneventful"; -function supportDragDrop(node: HTMLElement) { +function supportDragDrop(parentNode: HTMLElement) { return start(function*(job) { - const mouseDown = fromDomEvent(node, "mousedown"); - for (const {item: node, next} of yield *each(mouseDown)) { - const dropTarget = yield *drag(node); - // do something with the drop here + const mouseDown = fromDomEvent(parentNode, "mousedown"); + for (const {item: event, next} of yield *each(mouseDown)) { + if (event.target.matches(".drag-handle") { + const dropTarget = yield *drag(event.target.closest(".draggable")); + // do something with the dropTarget here + } yield next; // wait for next mousedown }); }); @@ -75,7 +77,7 @@ function supportDragDrop(node: HTMLElement) { Where our previous job did a bunch of things in parallel, this one is *serial*. If the previous job was akin to a Promise constructor, this one is more like an async function. It loops over an event like it was an async iterator, but it does so semi-synchronously. (Specifically, each pass of the loop starts *during* the event being responded to, not in a later microtask!) -Then it starts a drag job, and waits for its completion, receiving the return value in much the same way as an `await` does -- but again, semi-synchronously, during the mouseup event that ends the `drag()` call. (Note: this synchronous return-from-a-job is specific to using `yield` in another job function: if you `await` a job or call its `.then()` method to obtain the result, it'll happen in a later microtask as is normal for promise-based APIs.) +Then it starts a drag job, and waits for its completion, receiving the return value in much the same way as an `await` does -- but again, semi-synchronously, during the mouseup event that ends the `drag()` call. (Note: this pseudo-synchronous return-from-a-job is specific to using `yield` in another job function: if you `await` a job or call its `.then()` method to obtain the result, it'll happen in a later microtask as is normal for promise-based APIs.) And though we haven't shown any details here of what's being *done* with the drop, it's possible that we'll kick off some additional jobs to do an animation or contact a server or something of that sort, and wait for those to finish before enabling drag again. (Unless of course we *want* them to be able to overlap with additional dragging, in which case we can spin off detached jobs.) @@ -114,9 +116,9 @@ start(job => { }); ``` -Let's say that `currentlyHoveredFolder` is a stream that sends events as the hover state changes: either a folder object or `null` if no hovering is happening. The `restarting()` API wraps the event handler with a "temp" job that is canceled and restarted each time the function is called. +Let's say that `currentlyHoveredFolder` is a stream (or signal!) that sends events as the hover state changes: either a folder object or `null` if no hovering is happening. The `restarting()` API wraps the event handler with a "temp" job that is canceled and restarted each time the function is called. -With this setup, the "open the folder here" code will only be reached if the hover time on a given folder exceeds 300ms. Otherwise, the next change in the hovered folder will cancel the sleeping job (incidentally clearing the timer ID allocated by the `sleep()` as it does so). +With this setup, the "open the folder here" code will only be reached if the hover time on a given folder exceeds 300ms. Otherwise, the next change in the hovered folder will cancel the sleeping job (incidentally clearing the timeout allocated by the `sleep()` as it does so). Now, in this simple example you *could* just directly do the debouncing by manipulating the stream. And for a lot of simple things, that might even be the best way to do it. Some event driven libraries might even have lots of handy built-in ways to do things like canceling your in-flight ajax requests when the user types in a search field. @@ -131,7 +133,15 @@ Why the differences? Uneventful is all about *making clear what your code is do (But of course, if you're migrating from another signal framework, or are just really attached to the more obscure terminology, you can still rename them in your code with `import as`!) -Beyond these superficial differences, though, there are some deeper ones. Unlike other libraries' "effects", Uneventful's rules *start asynchronously* and can be *independently scheduled*. This means, for example, that it's easy to make rules that run only in, say, animation frames: +Beyond these superficial differences, though, there are some deeper ones. + +First off, in Uneventful, *signals are also streams*. When signals are used in APIs that expect streams (including `each()`), they send their current value on the initial subscription, followed by new values when their values change. + +And they also support *backpressure*: if you iterate over a signal's values with `each()`, then the changes are based on sampling the value when the loop isn't busy (i.e. during the `yield next`). This makes it really easy to (for example) loop over the various value of an input field doing remote searches with them, while maintaining a desired search frequency or level connection saturation using `sleep()` delays in the loop. + +Second, you can also *turn streams into signals*, by passing them to `cached()`. So if you want a signal that tracks the current mouse position or modifier keys' state, just use `cached(fromDomEvent(...))` or `pipe(fromDomEvent(...), map(...), cached)`, and off you go! As long as the resulting signal is observed by a rule (directly or indirectly) it subscribes to the stream and returns the most recent value. And as soon as all its observers go away, the underlying source is unsubscribed, so there are no dangling event listeners. + +But wait, there's more: Unlike most libraries' "effects", Uneventful's rules *start asynchronously* and can be *independently scheduled*. This means, for example, that it's easy to make rules that run only in, say, animation frames: ```ts import { RuleScheduler } from "uneventful"; diff --git a/specs/signals.spec.ts b/specs/signals.spec.ts index f322fee..b44fe7c 100644 --- a/specs/signals.spec.ts +++ b/specs/signals.spec.ts @@ -1,7 +1,7 @@ -import { log, see, describe, expect, it, useRoot } from "./dev_deps.ts"; +import { log, see, describe, expect, it, useRoot, useClock, clock } from "./dev_deps.ts"; import { runRules, value, cached, rule, noDeps, WriteConflict, Signal, Writable, must, recalcWhen, - DisposeFn, RecalcSource, mockSource, lazy + DisposeFn, RecalcSource, mockSource, lazy, detached, each, sleep } from "../mod.ts"; // Verify a signal of a given value returns the right things from its methods @@ -21,6 +21,7 @@ function verifyMulti(f: (v: T) => Signal) { describe("Signal Constructors/Interfaces", () => { useRoot(); + useClock(); describe("value()", () => { it("implements the Signal interface", () => { verifyMulti(value); }); it("is a Writable instance", () => { @@ -75,6 +76,27 @@ describe("Signal Constructors/Interfaces", () => { s.value = 9999; see("9999"); }); + it("can be subscribed as a source", () => { + // Given a value and a job that iterates over it with pauses + const v = value(42), j = detached.start(function *(){ + for(const {item, next} of yield *each(v)) { + log(item); yield *sleep(10); yield next; + } + }); + // When the job starts, it should output the initial value + clock.tick(0); see("42"); + // And it should reflect changes in the value over time + v.set(99); clock.tick(10); see("99"); + v.set(27); clock.tick(10); see("27"); + // But changes made while paused are overlooked + v.set(54); clock.tick(5); see(); + v.set(22); clock.tick(5); see("22"); + v.set(33); clock.tick(5); see(); + // And if the value changes back to the previously-seen value + // Then there's no new output when iteration resumes + v.set(22); clock.tick(5); see(); + j.end(); + }); }); describe("cached()", () => { it("implements the Signal interface", () => { verifyMulti((v) => cached(() => v)); }); @@ -106,6 +128,27 @@ describe("Signal Constructors/Interfaces", () => { s.value = 9999; see("9999"); }); + it("can be subscribed as a source", () => { + // Given a cached based on a value, and a job that iterates it with pauses + const v = value(42), s = cached(() => v()*2), j = detached.start(function *(){ + for(const {item, next} of yield *each(s)) { + log(item); yield *sleep(10); yield next; + } + }); + // When the job starts, it should output the initial value + clock.tick(0); see("84"); + // And it should reflect changes in the value over time + v.set(99); clock.tick(10); see("198"); + v.set(27); clock.tick(10); see("54"); + // But changes made while paused are overlooked + v.set(54); clock.tick(5); see(); + v.set(22); clock.tick(5); see("44"); + v.set(33); clock.tick(5); see(); + // And if the value changes back to the previously-seen value + // Then there's no new output when iteration resumes + v.set(22); clock.tick(5); see(); + j.end(); + }); }); describe("cached(stream, initVal)", () => { it("Follows source when observed, initVal otherwise", () => { diff --git a/specs/sources.spec.ts b/specs/sources.spec.ts index 0bb52bd..4fe2b10 100644 --- a/specs/sources.spec.ts +++ b/specs/sources.spec.ts @@ -1,12 +1,12 @@ import { log, waitAndSee, see, describe, expect, it, useClock, clock, useRoot, createStubInstance, spy } from "./dev_deps.ts"; -import { throttle, connect, value, runRules, isError, JobResult, isValue, markHandled } from "../src/mod.ts"; import { runPulls } from "../src/scheduling.ts"; import { - emitter, empty, fromAsyncIterable, fromDomEvent, fromIterable, fromPromise, fromSignal, - fromValue, fromSubscribe, interval, lazy, never, Emitter, mockSource -} from "../src/sources.ts"; + emitter, empty, fromAsyncIterable, fromDomEvent, fromIterable, fromPromise, + fromValue, fromSubscribe, interval, lazy, never, Emitter, mockSource, + throttle, connect, isError, JobResult, isValue, markHandled +} from "../src/mod.ts"; function logClose(e: JobResult) { log("closed"); if (isError(e)) log(`err: ${markHandled(e)}`)} @@ -296,34 +296,6 @@ describe("Sources", () => { see(); await Promise.resolve(); await Promise.resolve(); see("42", "closed"); }); }); - describe("fromSignal()", () => { - it("should output each value of the signal, including the first", () => { - // Given a fromSignal(value()) - const v = value(42), s = fromSignal(v); - // When it's subscribed - connect(s, log.emit); - // Then it should output the current value once rules+pulls run - see(); runRules(); runPulls(); see("42"); - // And output the latest current value on subsequent runs - v.set(43); runPulls(); v.set(44); - see(); runRules(); runPulls(); see("44"); - }); - it("should not emit duplicate values", () => { - // Given a fromSignal(func()) - const v1 = value(42), v2 = value(0); - const f = () => v1() * v2(), s = fromSignal(f); - // When it's subscribed - connect(s, log.emit); - // Then it should output the current value once rules+pulls run - see(); runRules(); runPulls(); see("0"); - // And should not output duplicates even if dependencies change - v1.set(43); v1.set(44); - see(); runRules(); runPulls(); see(); - // But should still output changes to the result - v2.set(1); - see(); runRules(); runPulls(); see("44"); - }); - }); describe("fromSubscribe()", () => { useClock(); it("should subscribe w/pusher after defer, and unsub on close", () => { diff --git a/src/cells.ts b/src/cells.ts index ff9d388..020d1de 100644 --- a/src/cells.ts +++ b/src/cells.ts @@ -3,7 +3,7 @@ import { defer } from "./defer.ts"; import { RunQueue } from "./scheduling.ts"; import { DisposeFn, OptionalCleanup, RecalcSource } from "./types.ts" import { detached, getJob, makeJob } from "./tracking.ts"; -import { IsStream, Source } from "./streams.ts"; +import { Connection, Inlet, IsStream, Sink, Producer, backpressure } from "./streams.ts"; import { setMap } from "./utils.ts"; import { isCancel } from "./results.ts"; @@ -248,6 +248,8 @@ function delsub(sub: Subscription) { freesubs = sub; } +const sentinel = {} // a unique value for uniqueness checking + /** @internal */ export class Cell { value: any = undefined // the value, or, for a rule, the scheduler @@ -265,7 +267,17 @@ export class Cell { compute: () => any = undefined; + stream(sink: Sink, conn?: Connection, inlet?: Inlet) { + let lastValue = sentinel; + (inlet ? RuleScheduler.for(backpressure(inlet)).rule : rule)(() => { + const val = this.getValue(); + if (val !== lastValue) sink(lastValue = val); + }); + return IsStream; + } + getValue() { + if (arguments.length) return this.stream.apply(this, arguments as any); this.catchUp(); const dep = current.cell; if (dep) { @@ -432,7 +444,7 @@ export class Cell { return cell; } - static mkStream(src: Source, val?: T): () => T { + static mkStream(src: Producer, val?: T): () => T { const cell = this.mkValue(val); cell.flags |= Is.Stream; cell.ctx = makeCtx(); diff --git a/src/operators.ts b/src/operators.ts index adad6ac..19b3af4 100644 --- a/src/operators.ts +++ b/src/operators.ts @@ -1,5 +1,5 @@ import { fromIterable } from "./sources.ts"; -import { Connection, IsStream, Sink, Source, Transformer, backpressure, throttle } from "./streams.ts"; +import { Connection, IsStream, Producer, Sink, Source, Transformer, backpressure, throttle } from "./streams.ts"; import { isValue, noop } from "./results.ts"; import { start } from "./jobutils.ts"; @@ -16,7 +16,7 @@ import { start } from "./jobutils.ts"; * * @category Stream Operators */ -export function concat(sources: Source[] | Iterable>): Source { +export function concat(sources: Source[] | Iterable>): Producer { return concatAll(fromIterable(sources)) } @@ -33,7 +33,7 @@ export function concat(sources: Source[] | Iterable>): Source * * @category Stream Operators */ -export function concatAll(sources: Source>): Source { +export function concatAll(sources: Source>): Producer { return (sink, conn=start(), inlet) => { let inner: Connection; const inputs: Source[] = [], t = throttle(); @@ -112,7 +112,7 @@ export function map(mapper: (v: T, idx: number) => R): Transformer { * * @category Stream Operators */ -export function merge(sources: Source[] | Iterable>): Source { +export function merge(sources: Source[] | Iterable>): Producer { return mergeAll(fromIterable(sources)); } @@ -124,7 +124,7 @@ export function merge(sources: Source[] | Iterable>): Source * * @category Stream Operators */ -export function mergeAll(sources: Source>): Source { +export function mergeAll(sources: Source>): Producer { return (sink, conn=start(), inlet) => { const uplinks: Set = new Set; let outer = conn.connect(sources, (s) => { @@ -264,7 +264,7 @@ export function slack(size: number, dropped: Sink = noop): Transformer * * @category Stream Operators */ -export function switchAll(sources: Source>): Source { +export function switchAll(sources: Source>): Producer { return (sink, conn=start(), inlet) => { let inner: Connection; let outer = conn.connect(sources, s => { diff --git a/src/signals.ts b/src/signals.ts index 371a046..5f827eb 100644 --- a/src/signals.ts +++ b/src/signals.ts @@ -3,10 +3,16 @@ import { PlainFunction, Yielding, RecalcSource } from "./types.ts"; import { Cell, rule } from "./cells.ts"; import { reject, resolve } from "./results.ts"; import { UntilMethod } from "./sinks.ts"; -import { Source } from "./streams.ts"; +import { Connection, Inlet, IsStream, Sink, Source, Producer } from "./streams.ts"; export { RuleScheduler, rule, runRules, WriteConflict, CircularDependency } from "./cells.ts"; export interface Signal { + /** + * A signal object implements the {@link Producer} interface, even if it's + * not directly recognized as one by TypeScript. + */ + (sink: Sink, conn?: Connection, inlet?: Inlet): typeof IsStream + /** A signal object can be called to get its current value */ (): T } @@ -35,7 +41,10 @@ export class Signal extends Function implements UntilMethod { readonly(): Signal { return this; } /** New writable signal with a custom setter */ - withSet(set: (v: T) => unknown) { return mkSignal(() => this(), set); } + withSet(set: (v: T) => unknown) { + const that = this; + return mkSignal( function () { return that.apply(null, arguments); }, set); + } *"uneventful.until"(): Yielding { return yield (r => { @@ -70,7 +79,10 @@ export interface Writable { export class Writable extends Signal { get value() { return this(); } set value(val: T) { this.set(val); } - readonly() { return mkSignal(() => this()); } + readonly(): Signal { + const that = this; + return mkSignal( function () { return that.apply(null, arguments); }); + } } /** @@ -118,7 +130,7 @@ export function cached>(signal: T): T export function cached(compute: Source | (() => T), initVal?: T): Signal { if (compute instanceof Signal) return compute; return mkSignal( - compute.length ? Cell.mkStream(compute as Source, initVal) : Cell.mkCached(compute as () => T) + compute.length ? Cell.mkStream(compute as Producer, initVal) : Cell.mkCached(compute as () => T) ); } diff --git a/src/sources.ts b/src/sources.ts index 0111b25..17092c0 100644 --- a/src/sources.ts +++ b/src/sources.ts @@ -1,6 +1,5 @@ import { defer } from "./defer.ts"; -import { RuleScheduler, cached } from "./signals.ts"; -import { type Source, IsStream, backpressure, Sink, Connection, Backpressure, throttle, Inlet } from "./streams.ts"; +import { type Source, IsStream, backpressure, Sink, Connection, Backpressure, throttle, Inlet, Producer } from "./streams.ts"; import { getJob, detached } from "./tracking.ts"; import { must, start } from "./jobutils.ts"; import { DisposeFn } from "./types.ts"; @@ -17,7 +16,7 @@ export interface Emitter { /** Call the emitter to emit events on its .source */ (val: T): void; /** An event source that receives the events */ - source: Source; + source: Producer; /** Close all current subscribers' connections */ end: () => void; /** Close all current subscribers' connections with an error */ @@ -46,7 +45,7 @@ export function emitter(): Emitter { * * @category Stream Producers */ -export function empty(): Source { +export function empty(): Producer { return (_, conn) => (conn?.return(), IsStream); } @@ -59,7 +58,7 @@ export function empty(): Source { * * @category Stream Producers */ -export function fromAsyncIterable(iterable: AsyncIterable): Source { +export function fromAsyncIterable(iterable: AsyncIterable): Producer { return (sink, conn=start(), inlet) => { const ready = backpressure(inlet); const iter = iterable[Symbol.asyncIterator](); @@ -92,19 +91,19 @@ export function fromAsyncIterable(iterable: AsyncIterable): Source { */ export function fromDomEvent( target: T, type: K, options?: boolean | AddEventListenerOptions -): Source; +): Producer; export function fromDomEvent( target: T, type: K, options?: boolean | AddEventListenerOptions -): Source; +): Producer; export function fromDomEvent( target: T, type: K, options?: boolean | AddEventListenerOptions -): Source; +): Producer; export function fromDomEvent( target: EventTarget, type: string, options?: boolean | AddEventListenerOptions -): Source +): Producer export function fromDomEvent( target: T, type: K, options?: boolean | AddEventListenerOptions -): Source { +): Producer { return (sink) => { function push(v: Event) { sink(v); } target.addEventListener(type, push, options); @@ -122,7 +121,7 @@ export function fromDomEvent( * * @category Stream Producers */ -export function fromIterable(iterable: Iterable): Source { +export function fromIterable(iterable: Iterable): Producer { return (sink, conn=start(), inlet) => { const ready = backpressure(inlet); const iter = iterable[Symbol.iterator](); @@ -153,7 +152,7 @@ export function fromIterable(iterable: Iterable): Source { * * @category Stream Producers */ -export function fromPromise(promise: Promise|PromiseLike|T): Source { +export function fromPromise(promise: Promise|PromiseLike|T): Producer { return (sink, conn) => { const job = getJob(); Promise.resolve(promise).then( @@ -164,27 +163,6 @@ export function fromPromise(promise: Promise|PromiseLike|T): Source } } -/** - * Create an event source from a signal (or signal-using function) - * - * The resulting event source will emit an event equal to each value the given - * function produces, including its current value at the time of subscription. - * (Technically, at the time of its first post-subscription scheduling.) - * - * @param scheduler - An {@link RuleScheduler} that will be used to sample the - * signal. Events will be only be emitted when the given scheduler is run. If - * no scheduler is given, the default (microtask-based) scheduler is used. - * - * @category Stream Producers - */ -export function fromSignal(s: () => T, scheduler = RuleScheduler.for(defer)): Source { - s = cached(s); - return (sink) => { - scheduler.rule(() => { sink(s()); }); - return IsStream; - } -} - /** * Create an event source from an arbitrary subscribe/unsubscribe function * @@ -198,7 +176,7 @@ export function fromSignal(s: () => T, scheduler = RuleScheduler.for(defer)): * * @category Stream Producers */ -export function fromSubscribe(subscribe: (cb: (val: T) => void) => DisposeFn): Source { +export function fromSubscribe(subscribe: (cb: (val: T) => void) => DisposeFn): Producer { return (sink) => { const f = getJob().must(() => sink = noop); return defer(() => f.must(subscribe(v => { sink(v); }))), IsStream; @@ -210,7 +188,7 @@ export function fromSubscribe(subscribe: (cb: (val: T) => void) => DisposeFn) * * @category Stream Producers */ -export function fromValue(val: T): Source { +export function fromValue(val: T): Producer { return (sink, conn) => { must(() => { sink = noop; conn = undefined; }) return defer(() => { sink(val); conn?.return(); }), IsStream; @@ -223,7 +201,7 @@ export function fromValue(val: T): Source { * * @category Stream Producers */ -export function interval(ms: number): Source { +export function interval(ms: number): Producer { return (sink) => { let idx = 0, id = setInterval(() => sink(idx++), ms); return must(() => clearInterval(id)), IsStream; @@ -240,7 +218,7 @@ export function interval(ms: number): Source { * * @category Stream Producers */ -export function lazy(factory: () => Source): Source { +export function lazy(factory: () => Source): Producer { return (sink, conn) => factory()(sink, conn) } @@ -265,12 +243,12 @@ export interface MockSource extends Emitter { */ export function mockSource(): MockSource { let write: Sink, outlet: Connection, ready: Backpressure; - function emit(val: T) { if (write) write(val); }; - emit.source = (mockSource ? (x: Source) => x : share)((sink, conn, inlet) => { + const emit: MockSource = (val: T) => { if (write) write(val); }; + emit.source = (sink, conn, inlet) => { write = sink; outlet = conn; ready = backpressure(inlet); must(() => write = outlet = ready = undefined); return IsStream; - }); + }; emit.end = () => outlet?.return(); emit.throw = (e: any) => outlet?.throw(e); emit.ready = (cb?: () => any) => ready(cb); @@ -282,7 +260,7 @@ export function mockSource(): MockSource { * * @category Stream Producers */ -export function never(): Source { +export function never(): Producer { return () => IsStream; } @@ -307,7 +285,7 @@ export function never(): Source { * * @category Stream Operators */ -export function share(source: Source): Source { +export function share(source: Source): Producer { let uplink: Connection; const links = new Set<[sink: Sink, conn: Connection]>, diff --git a/src/streams.ts b/src/streams.ts index 179ed97..8a52756 100644 --- a/src/streams.ts +++ b/src/streams.ts @@ -2,6 +2,7 @@ import { pulls } from "./scheduling.ts"; import { DisposeFn, Job } from "./types.ts"; import { getJob } from "./tracking.ts"; import { current } from "./ambient.ts"; +import { Signal, Writable } from "./signals.ts"; /** * A backpressure controller: returns true if downstream is ready to accept @@ -25,7 +26,7 @@ export type Backpressure = (cb?: () => any) => boolean */ export function backpressure(inlet: Inlet = defaultInlet): Backpressure { const job = getJob(); - return (cb?: Producer) => { + return (cb?: Flush) => { if (!job.result() && inlet.isOpen()) { if (cb) inlet.onReady(cb, job); return inlet.isReady(); @@ -85,7 +86,7 @@ export interface Throttle extends Inlet { export type Connection = Job; /** - * A `Source` is a function that can be called to arrange for data to be + * A Producer is a function that can be called to arrange for data to be * produced and sent to a {@link Sink} function for consumption, until the * associated {@link Connection} is closed (either by the source or the sink, * e.g. if the sink doesn't want more data or the source has no more to send). @@ -93,13 +94,24 @@ export type Connection = Job; * If the source is a backpressurable stream, it can use the (optional) supplied * inlet (usually a {@link throttle}()) to rate-limit its output. * - * A source function *must* return the special {@link IsStream} value, so + * A producer function *must* return the special {@link IsStream} value, so * TypeScript can tell what functions are usable as sources. (Otherwise any * void function with no arguments would appear to be usable as a source!) * * @category Types and Interfaces */ -export type Source = (sink: Sink, conn?: Connection, inlet?: Inlet) => typeof IsStream; +export type Producer = (sink: Sink, conn?: Connection, inlet?: Throttle | Inlet) => typeof IsStream; + +/** + * A Source is either a {@link Producer} or a {@link Signal}. (Signals actually + * implement the {@link Producer} interface as an overload, but TypeScript gets + * confused about that sometimes, so we generally declare our stream *inputs* as + * `Source` and our stream *outputs* as {@link Producer}, so that TypeScript + * knows what's what. + * + * @category Types and Interfaces + */ +export type Source = Producer | Signal | Writable; /** * A specially-typed string used to verify that a function supports uneventful's @@ -124,9 +136,9 @@ export type Sink = (val: T) => void; * * @category Types and Interfaces */ -export type Transformer = (input: Source) => Source; +export type Transformer = (input: Source) => Producer; -type Producer = () => any +type Flush = () => any /** @@ -142,7 +154,7 @@ type Producer = () => any * * @category Stream Consumers */ -export function connect(src: Source, sink: Sink, inlet?: Inlet): Connection { +export function connect(src: Source, sink: Sink, inlet?: Throttle | Inlet): Connection { return getJob().connect(src, sink, inlet); } @@ -163,7 +175,7 @@ export function throttle(job: Job = current.job): Throttle { class _Throttle implements Throttle { /** @internal */ - protected _callbacks: Map = undefined; + protected _callbacks: Map = undefined; /** @internal */ constructor(protected _job?: Job) {} @@ -176,7 +188,7 @@ class _Throttle implements Throttle { _isReady = true; _isPulling = false; - onReady(cb: Producer, job: Job) { + onReady(cb: Flush, job: Job) { if (!this.isOpen()) return this; const _callbacks = (this._callbacks ||= new Map); const unlink = job.release(() => _callbacks.delete(cb)); diff --git a/src/types.ts b/src/types.ts index 1b9a811..4d353fd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,5 +1,5 @@ import { JobResult } from "./results.ts"; -import { Connection, Inlet, Sink, Source } from "./streams.ts"; +import { Connection, Inlet, Sink, Source, Throttle } from "./streams.ts"; /** * An undefined or null value @@ -221,7 +221,7 @@ export interface Job extends Yielding, Promise { * * @category Execution Control */ - connect(src: Source, sink: Sink, inlet?: Inlet): Connection + connect(src: Source, sink: Sink, inlet?: Throttle | Inlet): Connection /** * Invoke a function with this job as the active one, so that calling the