Skip to content

Commit

Permalink
Stream/signal unification part 2 - Sources!
Browse files Browse the repository at this point in the history
Signals are now usable as Sources.  The old Source type is now
Producer, and the new Source type is a union of Signals and Producers.

This was necessary to avoid overloading shenanigans and incorrect
type inference for things that accepted Sources.  Going forward,
functions should usually *take* Sources and *return* Producers.
Producer is a narrower interface so TS is generally happy with
this approach.
  • Loading branch information
pjeby committed May 4, 2024
1 parent 27ac4ba commit 167cff9
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 108 deletions.
28 changes: 19 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ But Uneventful is actually *better* than statecharts, even for design purposes:
```ts
import { each } from "uneventful";

function supportDragDrop(node: HTMLElement) {
function supportDragDrop(parentNode: HTMLElement) {
return start(function*(job) {
const mouseDown = fromDomEvent(node, "mousedown");
for (const {item: node, next} of yield *each(mouseDown)) {
const dropTarget = yield *drag(node);
// do something with the drop here
const mouseDown = fromDomEvent(parentNode, "mousedown");
for (const {item: event, next} of yield *each(mouseDown)) {
if (event.target.matches(".drag-handle") {
const dropTarget = yield *drag(event.target.closest(".draggable"));
// do something with the dropTarget here
}
yield next; // wait for next mousedown
});
});
Expand All @@ -75,7 +77,7 @@ function supportDragDrop(node: HTMLElement) {
Where our previous job did a bunch of things in parallel, this one is *serial*. If the previous job was akin to a Promise constructor, this one is more like an async function. It loops over an event like it was an async iterator, but it does so semi-synchronously. (Specifically, each pass of the loop starts *during* the event being responded to, not in a later microtask!)
Then it starts a drag job, and waits for its completion, receiving the return value in much the same way as an `await` does -- but again, semi-synchronously, during the mouseup event that ends the `drag()` call. (Note: this synchronous return-from-a-job is specific to using `yield` in another job function: if you `await` a job or call its `.then()` method to obtain the result, it'll happen in a later microtask as is normal for promise-based APIs.)
Then it starts a drag job, and waits for its completion, receiving the return value in much the same way as an `await` does -- but again, semi-synchronously, during the mouseup event that ends the `drag()` call. (Note: this pseudo-synchronous return-from-a-job is specific to using `yield` in another job function: if you `await` a job or call its `.then()` method to obtain the result, it'll happen in a later microtask as is normal for promise-based APIs.)
And though we haven't shown any details here of what's being *done* with the drop, it's possible that we'll kick off some additional jobs to do an animation or contact a server or something of that sort, and wait for those to finish before enabling drag again. (Unless of course we *want* them to be able to overlap with additional dragging, in which case we can spin off detached jobs.)
Expand Down Expand Up @@ -114,9 +116,9 @@ start(job => {
});
```
Let's say that `currentlyHoveredFolder` is a stream that sends events as the hover state changes: either a folder object or `null` if no hovering is happening. The `restarting()` API wraps the event handler with a "temp" job that is canceled and restarted each time the function is called.
Let's say that `currentlyHoveredFolder` is a stream (or signal!) that sends events as the hover state changes: either a folder object or `null` if no hovering is happening. The `restarting()` API wraps the event handler with a "temp" job that is canceled and restarted each time the function is called.
With this setup, the "open the folder here" code will only be reached if the hover time on a given folder exceeds 300ms. Otherwise, the next change in the hovered folder will cancel the sleeping job (incidentally clearing the timer ID allocated by the `sleep()` as it does so).
With this setup, the "open the folder here" code will only be reached if the hover time on a given folder exceeds 300ms. Otherwise, the next change in the hovered folder will cancel the sleeping job (incidentally clearing the timeout allocated by the `sleep()` as it does so).
Now, in this simple example you *could* just directly do the debouncing by manipulating the stream. And for a lot of simple things, that might even be the best way to do it. Some event driven libraries might even have lots of handy built-in ways to do things like canceling your in-flight ajax requests when the user types in a search field.
Expand All @@ -131,7 +133,15 @@ Why the differences? Uneventful is all about *making clear what your code is do
(But of course, if you're migrating from another signal framework, or are just really attached to the more obscure terminology, you can still rename them in your code with `import as`!)
Beyond these superficial differences, though, there are some deeper ones. Unlike other libraries' "effects", Uneventful's rules *start asynchronously* and can be *independently scheduled*. This means, for example, that it's easy to make rules that run only in, say, animation frames:
Beyond these superficial differences, though, there are some deeper ones.
First off, in Uneventful, *signals are also streams*. When signals are used in APIs that expect streams (including `each()`), they send their current value on the initial subscription, followed by new values when their values change.
And they also support *backpressure*: if you iterate over a signal's values with `each()`, then the changes are based on sampling the value when the loop isn't busy (i.e. during the `yield next`). This makes it really easy to (for example) loop over the various value of an input field doing remote searches with them, while maintaining a desired search frequency or level connection saturation using `sleep()` delays in the loop.
Second, you can also *turn streams into signals*, by passing them to `cached()`. So if you want a signal that tracks the current mouse position or modifier keys' state, just use `cached(fromDomEvent(...))` or `pipe(fromDomEvent(...), map(...), cached)`, and off you go! As long as the resulting signal is observed by a rule (directly or indirectly) it subscribes to the stream and returns the most recent value. And as soon as all its observers go away, the underlying source is unsubscribed, so there are no dangling event listeners.
But wait, there's more: Unlike most libraries' "effects", Uneventful's rules *start asynchronously* and can be *independently scheduled*. This means, for example, that it's easy to make rules that run only in, say, animation frames:
```ts
import { RuleScheduler } from "uneventful";
Expand Down
47 changes: 45 additions & 2 deletions specs/signals.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { log, see, describe, expect, it, useRoot } from "./dev_deps.ts";
import { log, see, describe, expect, it, useRoot, useClock, clock } from "./dev_deps.ts";
import {
runRules, value, cached, rule, noDeps, WriteConflict, Signal, Writable, must, recalcWhen,
DisposeFn, RecalcSource, mockSource, lazy
DisposeFn, RecalcSource, mockSource, lazy, detached, each, sleep
} from "../mod.ts";

// Verify a signal of a given value returns the right things from its methods
Expand All @@ -21,6 +21,7 @@ function verifyMulti(f: <T>(v: T) => Signal<T>) {

describe("Signal Constructors/Interfaces", () => {
useRoot();
useClock();
describe("value()", () => {
it("implements the Signal interface", () => { verifyMulti(value); });
it("is a Writable instance", () => {
Expand Down Expand Up @@ -75,6 +76,27 @@ describe("Signal Constructors/Interfaces", () => {
s.value = 9999;
see("9999");
});
it("can be subscribed as a source", () => {
// Given a value and a job that iterates over it with pauses
const v = value(42), j = detached.start(function *(){
for(const {item, next} of yield *each(v)) {
log(item); yield *sleep(10); yield next;
}
});
// When the job starts, it should output the initial value
clock.tick(0); see("42");
// And it should reflect changes in the value over time
v.set(99); clock.tick(10); see("99");
v.set(27); clock.tick(10); see("27");
// But changes made while paused are overlooked
v.set(54); clock.tick(5); see();
v.set(22); clock.tick(5); see("22");
v.set(33); clock.tick(5); see();
// And if the value changes back to the previously-seen value
// Then there's no new output when iteration resumes
v.set(22); clock.tick(5); see();
j.end();
});
});
describe("cached()", () => {
it("implements the Signal interface", () => { verifyMulti((v) => cached(() => v)); });
Expand Down Expand Up @@ -106,6 +128,27 @@ describe("Signal Constructors/Interfaces", () => {
s.value = 9999;
see("9999");
});
it("can be subscribed as a source", () => {
// Given a cached based on a value, and a job that iterates it with pauses
const v = value(42), s = cached(() => v()*2), j = detached.start(function *(){
for(const {item, next} of yield *each(s)) {
log(item); yield *sleep(10); yield next;
}
});
// When the job starts, it should output the initial value
clock.tick(0); see("84");
// And it should reflect changes in the value over time
v.set(99); clock.tick(10); see("198");
v.set(27); clock.tick(10); see("54");
// But changes made while paused are overlooked
v.set(54); clock.tick(5); see();
v.set(22); clock.tick(5); see("44");
v.set(33); clock.tick(5); see();
// And if the value changes back to the previously-seen value
// Then there's no new output when iteration resumes
v.set(22); clock.tick(5); see();
j.end();
});
});
describe("cached(stream, initVal)", () => {
it("Follows source when observed, initVal otherwise", () => {
Expand Down
36 changes: 4 additions & 32 deletions specs/sources.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {
log, waitAndSee, see, describe, expect, it, useClock, clock, useRoot, createStubInstance, spy
} from "./dev_deps.ts";
import { throttle, connect, value, runRules, isError, JobResult, isValue, markHandled } from "../src/mod.ts";
import { runPulls } from "../src/scheduling.ts";
import {
emitter, empty, fromAsyncIterable, fromDomEvent, fromIterable, fromPromise, fromSignal,
fromValue, fromSubscribe, interval, lazy, never, Emitter, mockSource
} from "../src/sources.ts";
emitter, empty, fromAsyncIterable, fromDomEvent, fromIterable, fromPromise,
fromValue, fromSubscribe, interval, lazy, never, Emitter, mockSource,
throttle, connect, isError, JobResult, isValue, markHandled
} from "../src/mod.ts";

function logClose(e: JobResult<void>) { log("closed"); if (isError(e)) log(`err: ${markHandled(e)}`)}

Expand Down Expand Up @@ -296,34 +296,6 @@ describe("Sources", () => {
see(); await Promise.resolve(); await Promise.resolve(); see("42", "closed");
});
});
describe("fromSignal()", () => {
it("should output each value of the signal, including the first", () => {
// Given a fromSignal(value())
const v = value(42), s = fromSignal(v);
// When it's subscribed
connect(s, log.emit);
// Then it should output the current value once rules+pulls run
see(); runRules(); runPulls(); see("42");
// And output the latest current value on subsequent runs
v.set(43); runPulls(); v.set(44);
see(); runRules(); runPulls(); see("44");
});
it("should not emit duplicate values", () => {
// Given a fromSignal(func())
const v1 = value(42), v2 = value(0);
const f = () => v1() * v2(), s = fromSignal(f);
// When it's subscribed
connect(s, log.emit);
// Then it should output the current value once rules+pulls run
see(); runRules(); runPulls(); see("0");
// And should not output duplicates even if dependencies change
v1.set(43); v1.set(44);
see(); runRules(); runPulls(); see();
// But should still output changes to the result
v2.set(1);
see(); runRules(); runPulls(); see("44");
});
});
describe("fromSubscribe()", () => {
useClock();
it("should subscribe w/pusher after defer, and unsub on close", () => {
Expand Down
16 changes: 14 additions & 2 deletions src/cells.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { defer } from "./defer.ts";
import { RunQueue } from "./scheduling.ts";
import { DisposeFn, OptionalCleanup, RecalcSource } from "./types.ts"
import { detached, getJob, makeJob } from "./tracking.ts";
import { IsStream, Source } from "./streams.ts";
import { Connection, Inlet, IsStream, Sink, Producer, backpressure } from "./streams.ts";
import { setMap } from "./utils.ts";
import { isCancel } from "./results.ts";

Expand Down Expand Up @@ -248,6 +248,8 @@ function delsub(sub: Subscription) {
freesubs = sub;
}

const sentinel = {} // a unique value for uniqueness checking

/** @internal */
export class Cell {
value: any = undefined // the value, or, for a rule, the scheduler
Expand All @@ -265,7 +267,17 @@ export class Cell {

compute: () => any = undefined;

stream<T>(sink: Sink<T>, conn?: Connection, inlet?: Inlet) {
let lastValue = sentinel;
(inlet ? RuleScheduler.for(backpressure(inlet)).rule : rule)(() => {
const val = this.getValue();
if (val !== lastValue) sink(lastValue = val);
});
return IsStream;
}

getValue() {
if (arguments.length) return this.stream.apply(this, arguments as any);
this.catchUp();
const dep = current.cell;
if (dep) {
Expand Down Expand Up @@ -432,7 +444,7 @@ export class Cell {
return cell;
}

static mkStream<T>(src: Source<T>, val?: T): () => T {
static mkStream<T>(src: Producer<T>, val?: T): () => T {
const cell = this.mkValue(val);
cell.flags |= Is.Stream;
cell.ctx = makeCtx();
Expand Down
12 changes: 6 additions & 6 deletions src/operators.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { fromIterable } from "./sources.ts";
import { Connection, IsStream, Sink, Source, Transformer, backpressure, throttle } from "./streams.ts";
import { Connection, IsStream, Producer, Sink, Source, Transformer, backpressure, throttle } from "./streams.ts";
import { isValue, noop } from "./results.ts";
import { start } from "./jobutils.ts";

Expand All @@ -16,7 +16,7 @@ import { start } from "./jobutils.ts";
*
* @category Stream Operators
*/
export function concat<T>(sources: Source<T>[] | Iterable<Source<T>>): Source<T> {
export function concat<T>(sources: Source<T>[] | Iterable<Source<T>>): Producer<T> {
return concatAll(fromIterable(sources))
}

Expand All @@ -33,7 +33,7 @@ export function concat<T>(sources: Source<T>[] | Iterable<Source<T>>): Source<T>
*
* @category Stream Operators
*/
export function concatAll<T>(sources: Source<Source<T>>): Source<T> {
export function concatAll<T>(sources: Source<Source<T>>): Producer<T> {
return (sink, conn=start(), inlet) => {
let inner: Connection;
const inputs: Source<T>[] = [], t = throttle();
Expand Down Expand Up @@ -112,7 +112,7 @@ export function map<T,R>(mapper: (v: T, idx: number) => R): Transformer<T,R> {
*
* @category Stream Operators
*/
export function merge<T>(sources: Source<T>[] | Iterable<Source<T>>): Source<T> {
export function merge<T>(sources: Source<T>[] | Iterable<Source<T>>): Producer<T> {
return mergeAll(fromIterable(sources));
}

Expand All @@ -124,7 +124,7 @@ export function merge<T>(sources: Source<T>[] | Iterable<Source<T>>): Source<T>
*
* @category Stream Operators
*/
export function mergeAll<T>(sources: Source<Source<T>>): Source<T> {
export function mergeAll<T>(sources: Source<Source<T>>): Producer<T> {
return (sink, conn=start(), inlet) => {
const uplinks: Set<Connection> = new Set;
let outer = conn.connect(sources, (s) => {
Expand Down Expand Up @@ -264,7 +264,7 @@ export function slack<T>(size: number, dropped: Sink<T> = noop): Transformer<T>
*
* @category Stream Operators
*/
export function switchAll<T>(sources: Source<Source<T>>): Source<T> {
export function switchAll<T>(sources: Source<Source<T>>): Producer<T> {
return (sink, conn=start(), inlet) => {
let inner: Connection;
let outer = conn.connect(sources, s => {
Expand Down
20 changes: 16 additions & 4 deletions src/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ import { PlainFunction, Yielding, RecalcSource } from "./types.ts";
import { Cell, rule } from "./cells.ts";
import { reject, resolve } from "./results.ts";
import { UntilMethod } from "./sinks.ts";
import { Source } from "./streams.ts";
import { Connection, Inlet, IsStream, Sink, Source, Producer } from "./streams.ts";
export { RuleScheduler, rule, runRules, WriteConflict, CircularDependency } from "./cells.ts";

export interface Signal<T> {
/**
* A signal object implements the {@link Producer} interface, even if it's
* not directly recognized as one by TypeScript.
*/
(sink: Sink<T>, conn?: Connection, inlet?: Inlet): typeof IsStream

/** A signal object can be called to get its current value */
(): T
}
Expand Down Expand Up @@ -35,7 +41,10 @@ export class Signal<T> extends Function implements UntilMethod<T> {
readonly(): Signal<T> { return this; }

/** New writable signal with a custom setter */
withSet(set: (v: T) => unknown) { return mkSignal(() => this(), set); }
withSet(set: (v: T) => unknown) {
const that = this;
return mkSignal( function () { return that.apply(null, arguments); }, set);
}

*"uneventful.until"(): Yielding<T> {
return yield (r => {
Expand Down Expand Up @@ -70,7 +79,10 @@ export interface Writable<T> {
export class Writable<T> extends Signal<T> {
get value() { return this(); }
set value(val: T) { this.set(val); }
readonly() { return mkSignal(() => this()); }
readonly(): Signal<T> {
const that = this;
return mkSignal<T>( function () { return that.apply(null, arguments); });
}
}

/**
Expand Down Expand Up @@ -118,7 +130,7 @@ export function cached<T extends Signal<any>>(signal: T): T
export function cached<T>(compute: Source<T> | (() => T), initVal?: T): Signal<T> {
if (compute instanceof Signal) return compute;
return mkSignal(
compute.length ? Cell.mkStream(compute as Source<T>, initVal) : Cell.mkCached(compute as () => T)
compute.length ? Cell.mkStream(compute as Producer<T>, initVal) : Cell.mkCached(compute as () => T)
);
}

Expand Down
Loading

0 comments on commit 167cff9

Please sign in to comment.