Skip to content

Commit

Permalink
feat(combine): change API for combine() operator
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
combine() now takes only streams as argument, no more project function. combine() will return an
stream that emits arrays of the collected values from each input stream. To transform that array,
you should now use map() operator after combine(), to take the array of collected values and return
a combination value. See tests for examples.
  • Loading branch information
staltz committed Jun 12, 2016
1 parent 17f0b95 commit a2aa0a6
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 172 deletions.
5 changes: 4 additions & 1 deletion perf/combine.js
Expand Up @@ -30,6 +30,9 @@ var options = {
function add3(a, b, c) {
return a + b + c;
}
function add3Arr(arr) {
return arr[0] + arr[1] + arr[2];
}

var xs1 = xs.fromArray(a);
var xs2 = xs.fromArray(a);
Expand All @@ -46,7 +49,7 @@ var rx3 = rxjs.Observable.from(a);
suite
.add('xstream', function(deferred) {
runners.runXStream(deferred,
xs.combine(add3, xs1, xs2, xs3).filter(even));
xs.combine(xs1, xs2, xs3).map(add3Arr).filter(even));
}, options)
.add('most', function(deferred) {
runners.runMost(deferred,
Expand Down
230 changes: 98 additions & 132 deletions src/core.ts
Expand Up @@ -62,18 +62,6 @@ function internalizeProducer<T>(producer: Producer<T>) {
(<InternalProducer<T>> (<any> producer))._stop = producer.stop;
}

function invoke(f: Function, args: Array<any>) {
switch (args.length) {
case 0: return f();
case 1: return f(args[0]);
case 2: return f(args[0], args[1]);
case 3: return f(args[0], args[1], args[2]);
case 4: return f(args[0], args[1], args[2], args[3]);
case 5: return f(args[0], args[1], args[2], args[3], args[4]);
default: return f.apply(void 0, args);
}
}

function compose2<T, U>(f1: (t: T) => any, f2: (t: T) => any): (t: T) => any {
return function composedFn(arg: T): any {
return f1(f2(arg));
Expand All @@ -86,39 +74,84 @@ function and<T>(f1: (t: T) => boolean, f2: (t: T) => boolean): (t: T) => boolean
};
}

export interface CombineProjectFunction {
<T1, T2, R>(v1: T1, v2: T2): R;
<T1, T2, T3, R>(v1: T1, v2: T2, v3: T3): R;
<T1, T2, T3, T4, R>(v1: T1, v2: T2, v3: T3, v4: T4): R;
<T1, T2, T3, T4, T5, R>(v1: T1, v2: T2, v3: T3, v4: T4, v5: T5): R;
<T1, T2, T3, T4, T5, T6, R>(v1: T1, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6): R;
<R>(...values: Array<any>): R;
export class MergeProducer<T> implements InternalProducer<T>, InternalListener<T> {
public type = 'merge';
private out: InternalListener<T> = emptyListener;
private ac: number; // ac is activeCount, starts initialized

constructor(public streams: Array<Stream<T>>) {
this.ac = streams.length;
}

_start(out: InternalListener<T>): void {
this.out = out;
const s = this.streams;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._add(this);
}
}

_stop(): void {
const s = this.streams;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._remove(this);
}
this.out = null;
this.ac = L;
}

_n(t: T) {
const u = this.out;
if (!u) return;
u._n(t);
}

_e(err: any) {
const u = this.out;
if (!u) return;
u._e(err);
}

_c() {
if (--this.ac === 0) {
const u = this.out;
if (!u) return;
u._c();
}
}
}

export interface CombineFactorySignature {
<T1, T2, R>(
project: (t1: T1, t2: T2) => R,
stream1: Stream<T1>,
stream2: Stream<T2>): Stream<R>;
<T1, T2, T3, R>(
project: (t1: T1, t2: T2, t3: T3) => R,
stream1: Stream<T1>,
stream2: Stream<T2>,
stream3: Stream<T3>): Stream<R>;
<T1, T2, T3, T4, R>(
project: (t1: T1, t2: T2, t3: T3, t4: T4) => R,
stream1: Stream<T1>,
stream2: Stream<T2>,
stream3: Stream<T3>,
stream4: Stream<T4>): Stream<R>;
<T1, T2, T3, T4, T5, R>(
project: (t1: T1, t2: T2, t3: T3, t4: T4, t5: T5) => R,
stream1: Stream<T1>,
stream2: Stream<T2>,
stream3: Stream<T3>,
stream4: Stream<T4>,
stream5: Stream<T5>): Stream<R>;
<R>(project: (...args: Array<any>) => R, ...streams: Array<Stream<any>>): Stream<R>;
export interface CombineSignature {
(): Stream<Array<any>>;
<T1>(s1: Stream<T1>): Stream<[T1]>;
<T1, T2>(
s1: Stream<T1>,
s2: Stream<T2>): Stream<[T1, T2]>;
<T1, T2, T3>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>): Stream<[T1, T2, T3]>;
<T1, T2, T3, T4>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>): Stream<[T1, T2, T3, T4]>;
<T1, T2, T3, T4, T5>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>,
s5: Stream<T5>): Stream<[T1, T2, T3, T4, T5]>;
<T1, T2, T3, T4, T5, T6>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>,
s5: Stream<T5>,
s6: Stream<T6>): Stream<[T1, T2, T3, T4, T5, T6]>;
(...stream: Array<Stream<any>>): Stream<Array<any>>;
}

export class CombineListener<T> implements InternalListener<T> {
Expand All @@ -131,11 +164,7 @@ export class CombineListener<T> implements InternalListener<T> {
const p = this.p, out = p.out;
if (!out) return;
if (p.up(t, this.i)) {
try {
out._n(invoke(p.project, p.vals));
} catch (e) {
out._e(e);
}
out._n(p.vals);
}
}

Expand All @@ -154,16 +183,15 @@ export class CombineListener<T> implements InternalListener<T> {
}
}

export class CombineProducer<R> implements InternalProducer<R> {
export class CombineProducer<R> implements InternalProducer<Array<R>> {
public type = 'combine';
public out: InternalListener<R> = emptyListener;
public out: InternalListener<Array<R>> = emptyListener;
public ils: Array<CombineListener<any>> = [];
public ac: number; // ac is "active count", num of streams still not completed
public left: number; // number of streams that still need to emit a value
public vals: Array<any>;
public vals: Array<R>;

constructor(public project: CombineProjectFunction,
public streams: Array<Stream<any>>) {
constructor(public streams: Array<Stream<any>>) {
const n = this.ac = this.left = streams.length;
const vals = this.vals = new Array(n);
for (let i = 0; i < n; i++) {
Expand All @@ -178,11 +206,14 @@ export class CombineProducer<R> implements InternalProducer<R> {
return left === 0;
}

_start(out: InternalListener<R>): void {
_start(out: InternalListener<Array<R>>): void {
this.out = out;
const s = this.streams;
const n = s.length;
if (n === 0) this.zero(out); else {
if (n === 0) {
out._n(this.vals);
out._c();
} else {
for (let i = 0; i < n; i++) {
s[i]._add(new CombineListener(i, this));
}
Expand All @@ -200,15 +231,6 @@ export class CombineProducer<R> implements InternalProducer<R> {
this.out = null;
this.ils = [];
}

zero(out: InternalListener<R>): void {
try {
out._n(this.project<R>());
out._c();
} catch (e) {
out._e(e);
}
}
}

export class FromArrayProducer<T> implements InternalProducer<T> {
Expand Down Expand Up @@ -258,55 +280,6 @@ export class FromPromiseProducer<T> implements InternalProducer<T> {
}
}

export class MergeProducer<T> implements InternalProducer<T>, InternalListener<T> {
public type = 'merge';
private out: InternalListener<T> = emptyListener;
private ac: number; // ac is activeCount, starts initialized

constructor(public streams: Array<Stream<T>>) {
this.ac = streams.length;
}

_start(out: InternalListener<T>): void {
this.out = out;
const s = this.streams;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._add(this);
}
}

_stop(): void {
const s = this.streams;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._remove(this);
}
this.out = null;
this.ac = L;
}

_n(t: T) {
const u = this.out;
if (!u) return;
u._n(t);
}

_e(err: any) {
const u = this.out;
if (!u) return;
u._e(err);
}

_c() {
if (--this.ac === 0) {
const u = this.out;
if (!u) return;
u._c();
}
}
}

export class PeriodicProducer implements InternalProducer<number> {
public type = 'periodic';
private intervalID: any = -1;
Expand Down Expand Up @@ -1241,40 +1214,33 @@ export class Stream<T> implements InternalListener<T> {
}

/**
* Combines multiple streams together to return a stream whose events are
* calculated from the latest events of each of the input streams.
* Combines multiple input streams together to return a stream whose events
* are arrays that collect the latest events from each input stream.
*
* *combine* remembers the most recent event from each of the input streams.
* When any of the input streams emits an event, that event together with all
* the other saved events are combined in the `project` function which should
* return a value. That value will be emitted on the output stream. It's
* essentially a way of mixing the events from multiple streams according to a
* formula.
* *combine* internally remembers the most recent event from each of the input
* streams. When any of the input streams emits an event, that event together
* with all the other saved events are combined into an array. That array will
* be emitted on the output stream. It's essentially a way of joining together
* the events from multiple streams.
*
* Marble diagram:
*
* ```text
* --1----2-----3--------4---
* ----a-----b-----c--d------
* combine((x,y) => x+y)
* combine
* ----1a-2a-2b-3b-3c-3d-4d--
* ```
*
* @factory true
* @param {Function} project A function of type `(x: T1, y: T2) => R` or
* similar that takes the most recent events `x` and `y` from the input
* streams and returns a value. The output stream will emit that value. The
* number of arguments for this function should match the number of input
* streams.
* @param {Stream} stream1 A stream to combine together with other streams.
* @param {Stream} stream2 A stream to combine together with other streams.
* Two or more streams may be given as arguments.
* Multiple streams, not just two, may be given as arguments.
* @return {Stream}
*/
static combine: CombineFactorySignature =
function combine<R>(project: CombineProjectFunction,
...streams: Array<Stream<any>>): Stream<R> {
return new Stream<R>(new CombineProducer<R>(project, streams));
static combine: CombineSignature = <CombineSignature>
function combine(...streams: Array<Stream<any>>): Stream<Array<any>> {
return new Stream<Array<any>>(new CombineProducer<any>(streams));
};

protected _map<U>(project: (t: T) => U): Stream<U> | MemoryStream<U> {
Expand Down

0 comments on commit a2aa0a6

Please sign in to comment.