Skip to content

Commit

Permalink
feat(rstream-graph): add IDs for all generated nodes, rename factory …
Browse files Browse the repository at this point in the history
…type

- MultiInputNodeFn => NodeFactory
  • Loading branch information
postspectacular committed Apr 24, 2018
1 parent e72478a commit 0153903
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
4 changes: 2 additions & 2 deletions packages/rstream-graph/src/api.ts
Expand Up @@ -2,7 +2,7 @@ import { ISubscribable } from "@thi.ng/rstream/api";
import { Transducer } from "@thi.ng/transducers/api";
import { IObjectOf } from "@thi.ng/api/api";

export type MultiInputNodeFn<T> = (src: ISubscribable<any>[]) => ISubscribable<T>;
export type NodeFactory<T> = (src: ISubscribable<any>[], id: string) => ISubscribable<T>;

/**
* A dataflow graph spec is simply an object where keys are node names
Expand All @@ -22,7 +22,7 @@ export type GraphSpec = IObjectOf<NodeSpec>;
* See `initGraph` and `nodeFromSpec` for more details (in /src/nodes.ts)
*/
export interface NodeSpec {
fn: (src: ISubscribable<any>[]) => ISubscribable<any>;
fn: NodeFactory<any>;
ins: NodeInput[];
out?: NodeOutput;
}
Expand Down
24 changes: 12 additions & 12 deletions packages/rstream-graph/src/graph.ts
Expand Up @@ -10,7 +10,7 @@ import { sync, StreamSync } from "@thi.ng/rstream/stream-sync";
import { Subscription } from "@thi.ng/rstream/subscription";
import { Transducer } from "@thi.ng/transducers/api";

import { NodeSpec } from "./api";
import { NodeSpec, NodeFactory } from "./api";

/**
* Dataflow graph initialization function. Takes an object of
Expand All @@ -23,8 +23,8 @@ import { NodeSpec } from "./api";
* @param nodes
*/
export const initGraph = (state: IAtom<any>, nodes: IObjectOf<NodeSpec>): IObjectOf<ISubscribable<any>> => {
for (let k in nodes) {
(<any>nodes)[k] = nodeFromSpec(state, nodes[k]);
for (let id in nodes) {
(<any>nodes)[id] = nodeFromSpec(state, nodes[id], id);
}
return resolveMap(nodes);
};
Expand All @@ -43,7 +43,7 @@ export const initGraph = (state: IAtom<any>, nodes: IObjectOf<NodeSpec>): IObjec
*
* @param spec
*/
const nodeFromSpec = (state: IAtom<any>, spec: NodeSpec) => (resolve) => {
const nodeFromSpec = (state: IAtom<any>, spec: NodeSpec, id: string) => (resolve) => {
const src: ISubscribable<any>[] = [];
for (let i of spec.ins) {
let s;
Expand All @@ -63,10 +63,10 @@ const nodeFromSpec = (state: IAtom<any>, spec: NodeSpec) => (resolve) => {
}
src.push(s);
}
const node = spec.fn(src);
const node = spec.fn(src, id);
if (spec.out) {
if (isString(spec.out)) {
((path) => node.subscribe({ next: (x) => state.resetIn(path, x) }))(spec.out);
((path) => node.subscribe({ next: (x) => state.resetIn(path, x) }, `out-${id}`))(spec.out);
} else {
spec.out(node);
}
Expand All @@ -75,7 +75,7 @@ const nodeFromSpec = (state: IAtom<any>, spec: NodeSpec) => (resolve) => {
};

export const addNode = (graph: IObjectOf<ISubscribable<any>>, state: IAtom<any>, id: string, spec: NodeSpec) =>
graph[id] = nodeFromSpec(state, spec)((nodeID) => graph[nodeID]);
graph[id] = nodeFromSpec(state, spec, id)((nodeID) => graph[nodeID]);

export const removeNode = (graph: IObjectOf<ISubscribable<any>>, id: string) => {
if (graph[id]) {
Expand All @@ -94,18 +94,18 @@ export const removeNode = (graph: IObjectOf<ISubscribable<any>>, id: string) =>
* @param xform
* @param arity
*/
export const node = (xform: Transducer<IObjectOf<any>, any>, arity?: number) =>
(src: ISubscribable<any>[]): StreamSync<any, any> => {
export const node = (xform: Transducer<IObjectOf<any>, any>, arity?: number): NodeFactory<any> =>
(src: ISubscribable<any>[], id: string): StreamSync<any, any> => {
if (arity !== undefined && src.length !== arity) {
illegalArgs(`wrong number of inputs: got ${src.length}, but needed ${arity}`);
}
return sync({ src, xform, reset: false });
return sync({ src, xform, reset: false, id });
};

/**
* Syntax sugar / helper fn for nodes using only single input.
*
* @param xform
*/
export const node1 = (xform: Transducer<any, any>) =>
([src]: ISubscribable<any>[]): Subscription<any, any> => src.subscribe(xform);
export const node1 = (xform: Transducer<any, any>): NodeFactory<any> =>
([src]: ISubscribable<any>[], id: string): Subscription<any, any> => src.subscribe(xform, id);
4 changes: 2 additions & 2 deletions packages/rstream-graph/src/nodes/extract.ts
@@ -1,13 +1,13 @@
import { Path, getIn } from "@thi.ng/paths";
import { map } from "@thi.ng/transducers/xform/map";

import { MultiInputNodeFn } from "../api";
import { NodeFactory } from "../api";
import { node1 } from "../graph";

/**
* Nested value extraction node. Higher order function.
*
* Inputs: 1
*/
export const extract = (path: Path): MultiInputNodeFn<any> =>
export const extract = (path: Path): NodeFactory<any> =>
node1(map((x) => getIn(x, path)));
10 changes: 5 additions & 5 deletions packages/rstream-graph/src/nodes/math.ts
@@ -1,15 +1,15 @@
import { IObjectOf } from "@thi.ng/api/api";
import { map } from "@thi.ng/transducers/xform/map";

import { MultiInputNodeFn } from "../api";
import { NodeFactory } from "../api";
import { node } from "../graph";

/**
* Addition node.
*
* Inputs: any
*/
export const add: MultiInputNodeFn<number> = node(
export const add: NodeFactory<number> = node(
map((ports: IObjectOf<number>) => {
let acc = 0;
let v;
Expand All @@ -25,7 +25,7 @@ export const add: MultiInputNodeFn<number> = node(
*
* Inputs: any
*/
export const mul: MultiInputNodeFn<number> = node(
export const mul: NodeFactory<number> = node(
map((ports: IObjectOf<number>) => {
let acc = 1;
let v;
Expand All @@ -41,13 +41,13 @@ export const mul: MultiInputNodeFn<number> = node(
*
* Inputs: 2
*/
export const sub: MultiInputNodeFn<number> =
export const sub: NodeFactory<number> =
node(map((ports: IObjectOf<number>) => ports.a - ports.b), 2);

/**
* Division node.
*
* Inputs: 2
*/
export const div: MultiInputNodeFn<number> =
export const div: NodeFactory<number> =
node(map((ports: IObjectOf<number>) => ports.a / ports.b), 2);

0 comments on commit 0153903

Please sign in to comment.