diff --git a/packages/transducers/README.md b/packages/transducers/README.md index d1f19ed7f1..bd1e20c933 100644 --- a/packages/transducers/README.md +++ b/packages/transducers/README.md @@ -605,7 +605,7 @@ reducer and optional initial accumulator/result. #### `keep(f?: ((x: T) => any)): Transducer` -#### `labeled(id: L): Transducer` +#### `labeled(id: L | ((x: T) => L)): Transducer` #### `map(fn: (x: A) => B): Transducer` @@ -613,7 +613,7 @@ reducer and optional initial accumulator/result. #### `mapDeep(spec: TransformSpec): Transducer` -#### `mapIndexed(fn: (i: number, x: A) => B): Transducer` +#### `mapIndexed(fn: (i: number, x: A) => B, offset = 0): Transducer` #### `mapKeys(keys: IObjectOf<(x: any) => any>, copy?: boolean): Transducer` @@ -641,6 +641,8 @@ reducer and optional initial accumulator/result. #### `partitionSort(n: number, key?: ((x: A) => B), cmp?: Comparator): Transducer` +#### `partitionSync(keys: PropertyKey[] | Set, keyfn: (x: T) => PropertyKey, reset = true, all = true): Transducer>` + #### `pluck(key: PropertyKey): Transducer` #### `rename(kmap: IObjectOf, rfn?: Reducer): Transducer` diff --git a/packages/transducers/src/index.ts b/packages/transducers/src/index.ts index e38b33e1c4..13ee5b9186 100644 --- a/packages/transducers/src/index.ts +++ b/packages/transducers/src/index.ts @@ -66,6 +66,7 @@ export * from "./xform/page"; export * from "./xform/partition-by"; export * from "./xform/partition-of"; export * from "./xform/partition-sort"; +export * from "./xform/partition-sync"; export * from "./xform/partition"; export * from "./xform/pluck"; export * from "./xform/rename"; diff --git a/packages/transducers/src/xform/partition-sync.ts b/packages/transducers/src/xform/partition-sync.ts new file mode 100644 index 0000000000..0240cfc092 --- /dev/null +++ b/packages/transducers/src/xform/partition-sync.ts @@ -0,0 +1,94 @@ +import { isArray } from "@thi.ng/checks/is-array"; + +import { IObjectOf } from "@thi.ng/api/api"; +import { Reducer, Transducer } from "../api"; + +/** + * This transducer is intended for synchronization and provenance + * tracking of possibly previously merged inputs. It partitions the + * input into labeled tuple objects with the object keys obtained from + * the user provided `keyfn`. A new result is only produced once values + * from **all** given labeled sources have been consumed. Only labels + * contained in the provided key set are allowed, others are skipped. + * The tuples will contain the most recent consumed value from each + * labeled input. In dataflow scenarios this can be used to ensure a + * subsequent operation consuming these tuples has all necessary inputs, + * regardless of the individual rates of change of each original + * (pre-merge) input. + * + * ``` + * src = [ + * ["a", 1], ["a", 2], ["d", 100], ["b", 10], + * ["b", 11], ["c", 0], ["a", 3] + * ]; + * // form tuples for values only from sources "a" & "b" + * // here the label is the first element of each input item + * [...iterator(partitionSync(["a", "b"], (x) => x[0]), src)] + * // [ { a: ["a", 2], b: ["b", 10] }, + * // { b: ["b", 11], a: ["a", 3] } ] + * ``` + * + * In addition to the default mode of operation, i.e. waiting for new + * values from *all* named inputs before a new tuple is produced, the + * behavior for *all but the first tuple* can be changed to emit new + * tuples as soon as a new value with a qualifying label has become + * available (with other values in the tuple remaining). Compare with + * above example: + * + * ``` + * // passing `false` to disable tuple reset + * [...tx.iterator(tx.partitionSync(["a", "b"], (x) => x[0], false), src)] + * // [ { a: ["a", 2], b: ["b", 10] }, + * // { a: ["a", 2], b: ["b", 11] }, + * // { a: ["a", 3], b: ["b", 11] } ] + * + * By default, the last emitted tuple is allowed to be incomplete (in + * case the input closed). To only allow complete tuples, set the + * optional `all` arg to false. + * + * Note: If the `keys` set of allowed labels is modified externally, the + * tuple size will adjust accordingly (only if given as set, will not work + * if keys are provided as array). + * + * @param keys allowed label set + * @param keyfn label extraction function + * @param reset true if each tuple should contain only new values + * @param all true if last tuple is allowed to be incomplete + */ +export function partitionSync(keys: PropertyKey[] | Set, keyfn: (x: T) => PropertyKey, reset = true, all = true): Transducer> { + return ([init, complete, reduce]: Reducer>) => { + let curr = {}; + let first = true; + const currKeys = new Set(); + const ks = isArray(keys) ? new Set(keys) : keys; + return [ + init, + (acc) => { + if ((reset && all && currKeys.size > 0) || (!reset && first)) { + acc = reduce(acc, curr); + curr = undefined; + currKeys.clear(); + first = false; + } + return complete(acc); + }, + (acc, x: T) => { + const k = keyfn(x); + if (ks.has(k)) { + curr[k] = x; + currKeys.add(k); + if (currKeys.size >= ks.size) { + acc = reduce(acc, curr); + first = false; + if (reset) { + curr = {}; + currKeys.clear(); + } else { + curr = { ...curr }; + } + } + } + return acc; + }]; + }; +} \ No newline at end of file