Skip to content

Commit

Permalink
Stream/signal unification part 1: cached(source)
Browse files Browse the repository at this point in the history
Streams are now trivially convertible to signals.  Next up:
going the other way!
  • Loading branch information
pjeby committed May 4, 2024
1 parent 44f9378 commit 27ac4ba
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
29 changes: 28 additions & 1 deletion specs/signals.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { log, see, describe, expect, it, useRoot } from "./dev_deps.ts";
import {
runRules, value, cached, rule, noDeps, WriteConflict, Signal, Writable, must, recalcWhen,
DisposeFn, RecalcSource
DisposeFn, RecalcSource, mockSource, lazy
} from "../mod.ts";

// Verify a signal of a given value returns the right things from its methods
Expand Down Expand Up @@ -107,6 +107,33 @@ describe("Signal Constructors/Interfaces", () => {
see("9999");
});
});
describe("cached(stream, initVal)", () => {
it("Follows source when observed, initVal otherwise", () => {
// Given a mock source lazily wrapped in a cached()
const e = mockSource<string>();
const s = lazy(() => {
log("subscribe"); must(()=>log("unsubscribe")); return e.source;
});
const c = cached(s, "unobserved");
// When the signal is created, it should equal the initial value
expect(c()).to.equal("unobserved");
// And emitting values should have no effect on it, nor produce output
e("testing"); see(); expect(c()).to.equal("unobserved");
// But once the signal is observed by a rule
const r = rule(() => log(c())); runRules();
// The source should be susbcribed
see("subscribe", "unobserved");
// And emitting values should update the signal and fire the rule
e("test 1"); runRules(); see("test 1"); expect(c()).to.equal("test 1");
e("test 2"); runRules(); see("test 2"); expect(c()).to.equal("test 2");
// But duplicate values should not fire the rule
e("test 2"); runRules(); see(); expect(c()).to.equal("test 2");
// And if the rule is disposed of, the source should unsubscribe
r(); see("unsubscribe");
// And the value should revert to the initial value
expect(c()).to.equal("unobserved");
});
});
});

describe("Dependency tracking", () => {
Expand Down
28 changes: 14 additions & 14 deletions src/cells.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ export const rule = defaultQueue.rule;
export const runRules = defaultQueue.flush


/** recalcWhen(fn): map fn -> Cell */
const fntrackers = new WeakMap<Function, Cell>();
/** recalcWhen(fn): map fn -> signal */
const fntrackers = new WeakMap<Function, () => number>();

/** recalcWhen(key, factory): map factory -> key -> Cell */
const obtrackers = new WeakMap<Function, WeakMap<WeakKey, Cell>>();
/** recalcWhen(key, factory): map factory -> key -> signal */
const obtrackers = new WeakMap<Function, WeakMap<WeakKey, () => number>>();

const dirtyStack: Cell[] = [];

Expand Down Expand Up @@ -432,15 +432,15 @@ export class Cell {
return cell;
}

static mkStream<T>(src: Source<T>, val?: T) {
static mkStream<T>(src: Source<T>, val?: T): () => T {
const cell = this.mkValue(val);
cell.flags |= Is.Stream;
cell.ctx = makeCtx();
const write = cell.setValue.bind(cell);
cell.compute = () => {
cell.ctx.job ||= makeJob()
.asyncCatch(e => detached.asyncThrow(e))
.must(r => isCancel(r) || (cell.ctx.job = undefined))
.must(r => { cell.value = val; isCancel(r) || (cell.ctx.job = undefined); })
;
const old = swapCtx(cell.ctx);
try {
Expand All @@ -453,27 +453,27 @@ export class Cell {
swapCtx(old);
}
}
return cell;
return cell.getValue.bind(cell);
}

recalcWhen(src: RecalcSource): void;
recalcWhen<T extends WeakKey>(key: T, factory: (key: T) => RecalcSource): void;
recalcWhen<T extends WeakKey>(fnOrKey: T | RecalcSource, fn?: (key: T) => RecalcSource) {
let trackers: WeakMap<WeakKey, Cell> = fn ?
let trackers: WeakMap<WeakKey, () => number> = fn ?
obtrackers.get(fn) || setMap(obtrackers, fn, new WeakMap) :
fntrackers
;
let cell = trackers.get(fnOrKey);
if (!cell) {
let signal = trackers.get(fnOrKey);
if (!signal) {
const src = fn ? fn(<T>fnOrKey) : <RecalcSource> fnOrKey;
let ct = 0;
cell = Cell.mkStream(s => (src(() => s(++ct)), IsStream), ct);
trackers.set(fnOrKey, cell);
signal = Cell.mkStream(s => (src(() => s(++ct)), IsStream), ct);
trackers.set(fnOrKey, signal);
}
cell.getValue(); // Subscribe to the cell
signal(); // Subscribe to the cell
}

static mkCached<T>(compute: () => T) {
static mkCached<T>(compute: () => T): () => T {
const cell = new Cell;
cell.compute = compute;
cell.ctx = makeCtx(null, cell);
Expand Down
36 changes: 32 additions & 4 deletions src/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { PlainFunction, Yielding, RecalcSource } from "./types.ts";
import { Cell, rule } from "./cells.ts";
import { reject, resolve } from "./results.ts";
import { UntilMethod } from "./sinks.ts";
import { Source } from "./streams.ts";
export { RuleScheduler, rule, runRules, WriteConflict, CircularDependency } from "./cells.ts";

export interface Signal<T> {
Expand Down Expand Up @@ -83,15 +84,42 @@ export function value<T>(val?: T): Writable<T> {
}

/**
* Create a cached version of a function. The returned callable is also a {@link Signal}.
* Create a cached version of a function. The returned callable is also a
* {@link Signal}.
*
* Note: If the supplied function has a non-zero `.length` (i.e., it explicitly
* takes arguments), it is assumed to be a {@link Source}, and the second
* calling signature below will apply, even if TypeScript doesn't see it that
* way!)
*
* @category Signals
*/
export function cached<T>(compute: () => T): Signal<T>
export function cached<T>(compute: () => T): Signal<T>;

/**
* If the supplied function has a non-zero `.length` (i.e., it explicitly takes
* arguments), it is assumed to be a {@link Source}, and the second argument is
* a default value for the created signal to use as default value until the
* source produces a value.
*
* The source will be subscribed *only* while the signal is subscribed as a
* stream, or observed (directly or indirectly) by a rule. While subscribed,
* the signal will update itself with the most recent value produced by the
* source, triggering rules or events as appropriate if the value changes. When
* the signal is once again unobserved, it will revert to the supplied inital
* value.
*
* @param source A {@link Source} providing data which will become this signal's value
* @param initVal The value to use when the signal is unobserved or waiting for the
* first item from the source.
*/
export function cached<T>(source: Source<T>, initVal?: T): Signal<T>;
export function cached<T extends Signal<any>>(signal: T): T
export function cached<T>(compute: () => T): Signal<T> {
export function cached<T>(compute: Source<T> | (() => T), initVal?: T): Signal<T> {
if (compute instanceof Signal) return compute;
return mkSignal(Cell.mkCached(compute));
return mkSignal(
compute.length ? Cell.mkStream(compute as Source<T>, initVal) : Cell.mkCached(compute as () => T)
);
}

/**
Expand Down

0 comments on commit 27ac4ba

Please sign in to comment.