Skip to content

Commit

Permalink
feat(transducers): add partitionSync() xform
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Mar 19, 2018
1 parent 627dd7a commit bebd118
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
6 changes: 4 additions & 2 deletions packages/transducers/README.md
Expand Up @@ -605,15 +605,15 @@ reducer and optional initial accumulator/result.

#### `keep<T>(f?: ((x: T) => any)): Transducer<T, T>`

#### `labeled<L, T>(id: L): Transducer<T, [L, T]>`
#### `labeled<L, T>(id: L | ((x: T) => L)): Transducer<T, [L, T]>`

#### `map<A, B>(fn: (x: A) => B): Transducer<A, B>`

#### `mapcat<A, B>(fn: (x: A) => Iterable<B>): Transducer<A, B>`

#### `mapDeep(spec: TransformSpec): Transducer<any, any>`

#### `mapIndexed<A, B>(fn: (i: number, x: A) => B): Transducer<A, B>`
#### `mapIndexed<A, B>(fn: (i: number, x: A) => B, offset = 0): Transducer<A, B>`

#### `mapKeys(keys: IObjectOf<(x: any) => any>, copy?: boolean): Transducer<any, any>`

Expand Down Expand Up @@ -641,6 +641,8 @@ reducer and optional initial accumulator/result.

#### `partitionSort<A, B>(n: number, key?: ((x: A) => B), cmp?: Comparator<B>): Transducer<A, A>`

#### `partitionSync<T>(keys: PropertyKey[] | Set<PropertyKey>, keyfn: (x: T) => PropertyKey, reset = true, all = true): Transducer<T, IObjectOf<T>>`

#### `pluck<A, B>(key: PropertyKey): Transducer<A, B>`

#### `rename<A, B>(kmap: IObjectOf<PropertyKey>, rfn?: Reducer<B, [PropertyKey, A]>): Transducer<A[], B>`
Expand Down
1 change: 1 addition & 0 deletions packages/transducers/src/index.ts
Expand Up @@ -66,6 +66,7 @@ export * from "./xform/page";
export * from "./xform/partition-by";
export * from "./xform/partition-of";
export * from "./xform/partition-sort";
export * from "./xform/partition-sync";
export * from "./xform/partition";
export * from "./xform/pluck";
export * from "./xform/rename";
Expand Down
94 changes: 94 additions & 0 deletions packages/transducers/src/xform/partition-sync.ts
@@ -0,0 +1,94 @@
import { isArray } from "@thi.ng/checks/is-array";

import { IObjectOf } from "@thi.ng/api/api";
import { Reducer, Transducer } from "../api";

/**
* This transducer is intended for synchronization and provenance
* tracking of possibly previously merged inputs. It partitions the
* input into labeled tuple objects with the object keys obtained from
* the user provided `keyfn`. A new result is only produced once values
* from **all** given labeled sources have been consumed. Only labels
* contained in the provided key set are allowed, others are skipped.
* The tuples will contain the most recent consumed value from each
* labeled input. In dataflow scenarios this can be used to ensure a
* subsequent operation consuming these tuples has all necessary inputs,
* regardless of the individual rates of change of each original
* (pre-merge) input.
*
* ```
* src = [
* ["a", 1], ["a", 2], ["d", 100], ["b", 10],
* ["b", 11], ["c", 0], ["a", 3]
* ];
* // form tuples for values only from sources "a" & "b"
* // here the label is the first element of each input item
* [...iterator(partitionSync(["a", "b"], (x) => x[0]), src)]
* // [ { a: ["a", 2], b: ["b", 10] },
* // { b: ["b", 11], a: ["a", 3] } ]
* ```
*
* In addition to the default mode of operation, i.e. waiting for new
* values from *all* named inputs before a new tuple is produced, the
* behavior for *all but the first tuple* can be changed to emit new
* tuples as soon as a new value with a qualifying label has become
* available (with other values in the tuple remaining). Compare with
* above example:
*
* ```
* // passing `false` to disable tuple reset
* [...tx.iterator(tx.partitionSync(["a", "b"], (x) => x[0], false), src)]
* // [ { a: ["a", 2], b: ["b", 10] },
* // { a: ["a", 2], b: ["b", 11] },
* // { a: ["a", 3], b: ["b", 11] } ]
*
* By default, the last emitted tuple is allowed to be incomplete (in
* case the input closed). To only allow complete tuples, set the
* optional `all` arg to false.
*
* Note: If the `keys` set of allowed labels is modified externally, the
* tuple size will adjust accordingly (only if given as set, will not work
* if keys are provided as array).
*
* @param keys allowed label set
* @param keyfn label extraction function
* @param reset true if each tuple should contain only new values
* @param all true if last tuple is allowed to be incomplete
*/
export function partitionSync<T>(keys: PropertyKey[] | Set<PropertyKey>, keyfn: (x: T) => PropertyKey, reset = true, all = true): Transducer<T, IObjectOf<T>> {
return ([init, complete, reduce]: Reducer<any, IObjectOf<T>>) => {
let curr = {};
let first = true;
const currKeys = new Set<PropertyKey>();
const ks = isArray(keys) ? new Set(keys) : keys;
return [
init,
(acc) => {
if ((reset && all && currKeys.size > 0) || (!reset && first)) {
acc = reduce(acc, curr);
curr = undefined;
currKeys.clear();
first = false;
}
return complete(acc);
},
(acc, x: T) => {
const k = keyfn(x);
if (ks.has(k)) {
curr[k] = x;
currKeys.add(k);
if (currKeys.size >= ks.size) {
acc = reduce(acc, curr);
first = false;
if (reset) {
curr = {};
currKeys.clear();
} else {
curr = { ...curr };
}
}
}
return acc;
}];
};
}

0 comments on commit bebd118

Please sign in to comment.