diff --git a/packages/rstream-graph/src/api.ts b/packages/rstream-graph/src/api.ts index a0ac852bfe..4c3ba3879a 100644 --- a/packages/rstream-graph/src/api.ts +++ b/packages/rstream-graph/src/api.ts @@ -2,7 +2,7 @@ import { ISubscribable } from "@thi.ng/rstream/api"; import { Transducer } from "@thi.ng/transducers/api"; import { IObjectOf } from "@thi.ng/api/api"; -export type MultiInputNodeFn = (src: ISubscribable[]) => ISubscribable; +export type NodeFactory = (src: ISubscribable[], id: string) => ISubscribable; /** * A dataflow graph spec is simply an object where keys are node names @@ -22,7 +22,7 @@ export type GraphSpec = IObjectOf; * See `initGraph` and `nodeFromSpec` for more details (in /src/nodes.ts) */ export interface NodeSpec { - fn: (src: ISubscribable[]) => ISubscribable; + fn: NodeFactory; ins: NodeInput[]; out?: NodeOutput; } diff --git a/packages/rstream-graph/src/graph.ts b/packages/rstream-graph/src/graph.ts index 4c3b732dc2..9fa884851a 100644 --- a/packages/rstream-graph/src/graph.ts +++ b/packages/rstream-graph/src/graph.ts @@ -10,7 +10,7 @@ import { sync, StreamSync } from "@thi.ng/rstream/stream-sync"; import { Subscription } from "@thi.ng/rstream/subscription"; import { Transducer } from "@thi.ng/transducers/api"; -import { NodeSpec } from "./api"; +import { NodeSpec, NodeFactory } from "./api"; /** * Dataflow graph initialization function. Takes an object of @@ -23,8 +23,8 @@ import { NodeSpec } from "./api"; * @param nodes */ export const initGraph = (state: IAtom, nodes: IObjectOf): IObjectOf> => { - for (let k in nodes) { - (nodes)[k] = nodeFromSpec(state, nodes[k]); + for (let id in nodes) { + (nodes)[id] = nodeFromSpec(state, nodes[id], id); } return resolveMap(nodes); }; @@ -43,7 +43,7 @@ export const initGraph = (state: IAtom, nodes: IObjectOf): IObjec * * @param spec */ -const nodeFromSpec = (state: IAtom, spec: NodeSpec) => (resolve) => { +const nodeFromSpec = (state: IAtom, spec: NodeSpec, id: string) => (resolve) => { const src: ISubscribable[] = []; for (let i of spec.ins) { let s; @@ -63,10 +63,10 @@ const nodeFromSpec = (state: IAtom, spec: NodeSpec) => (resolve) => { } src.push(s); } - const node = spec.fn(src); + const node = spec.fn(src, id); if (spec.out) { if (isString(spec.out)) { - ((path) => node.subscribe({ next: (x) => state.resetIn(path, x) }))(spec.out); + ((path) => node.subscribe({ next: (x) => state.resetIn(path, x) }, `out-${id}`))(spec.out); } else { spec.out(node); } @@ -75,7 +75,7 @@ const nodeFromSpec = (state: IAtom, spec: NodeSpec) => (resolve) => { }; export const addNode = (graph: IObjectOf>, state: IAtom, id: string, spec: NodeSpec) => - graph[id] = nodeFromSpec(state, spec)((nodeID) => graph[nodeID]); + graph[id] = nodeFromSpec(state, spec, id)((nodeID) => graph[nodeID]); export const removeNode = (graph: IObjectOf>, id: string) => { if (graph[id]) { @@ -94,12 +94,12 @@ export const removeNode = (graph: IObjectOf>, id: string) => * @param xform * @param arity */ -export const node = (xform: Transducer, any>, arity?: number) => - (src: ISubscribable[]): StreamSync => { +export const node = (xform: Transducer, any>, arity?: number): NodeFactory => + (src: ISubscribable[], id: string): StreamSync => { if (arity !== undefined && src.length !== arity) { illegalArgs(`wrong number of inputs: got ${src.length}, but needed ${arity}`); } - return sync({ src, xform, reset: false }); + return sync({ src, xform, reset: false, id }); }; /** @@ -107,5 +107,5 @@ export const node = (xform: Transducer, any>, arity?: number) => * * @param xform */ -export const node1 = (xform: Transducer) => - ([src]: ISubscribable[]): Subscription => src.subscribe(xform); +export const node1 = (xform: Transducer): NodeFactory => + ([src]: ISubscribable[], id: string): Subscription => src.subscribe(xform, id); diff --git a/packages/rstream-graph/src/nodes/extract.ts b/packages/rstream-graph/src/nodes/extract.ts index 7a488f74e3..d12f2074c9 100644 --- a/packages/rstream-graph/src/nodes/extract.ts +++ b/packages/rstream-graph/src/nodes/extract.ts @@ -1,7 +1,7 @@ import { Path, getIn } from "@thi.ng/paths"; import { map } from "@thi.ng/transducers/xform/map"; -import { MultiInputNodeFn } from "../api"; +import { NodeFactory } from "../api"; import { node1 } from "../graph"; /** @@ -9,5 +9,5 @@ import { node1 } from "../graph"; * * Inputs: 1 */ -export const extract = (path: Path): MultiInputNodeFn => +export const extract = (path: Path): NodeFactory => node1(map((x) => getIn(x, path))); diff --git a/packages/rstream-graph/src/nodes/math.ts b/packages/rstream-graph/src/nodes/math.ts index 5fbd00c2cc..041b2a63bb 100644 --- a/packages/rstream-graph/src/nodes/math.ts +++ b/packages/rstream-graph/src/nodes/math.ts @@ -1,7 +1,7 @@ import { IObjectOf } from "@thi.ng/api/api"; import { map } from "@thi.ng/transducers/xform/map"; -import { MultiInputNodeFn } from "../api"; +import { NodeFactory } from "../api"; import { node } from "../graph"; /** @@ -9,7 +9,7 @@ import { node } from "../graph"; * * Inputs: any */ -export const add: MultiInputNodeFn = node( +export const add: NodeFactory = node( map((ports: IObjectOf) => { let acc = 0; let v; @@ -25,7 +25,7 @@ export const add: MultiInputNodeFn = node( * * Inputs: any */ -export const mul: MultiInputNodeFn = node( +export const mul: NodeFactory = node( map((ports: IObjectOf) => { let acc = 1; let v; @@ -41,7 +41,7 @@ export const mul: MultiInputNodeFn = node( * * Inputs: 2 */ -export const sub: MultiInputNodeFn = +export const sub: NodeFactory = node(map((ports: IObjectOf) => ports.a - ports.b), 2); /** @@ -49,5 +49,5 @@ export const sub: MultiInputNodeFn = * * Inputs: 2 */ -export const div: MultiInputNodeFn = +export const div: NodeFactory = node(map((ports: IObjectOf) => ports.a / ports.b), 2);