diff --git a/perf/combine.js b/perf/combine.js index 3b4b6b1..d8fb466 100644 --- a/perf/combine.js +++ b/perf/combine.js @@ -30,6 +30,9 @@ var options = { function add3(a, b, c) { return a + b + c; } +function add3Arr(arr) { + return arr[0] + arr[1] + arr[2]; +} var xs1 = xs.fromArray(a); var xs2 = xs.fromArray(a); @@ -46,7 +49,7 @@ var rx3 = rxjs.Observable.from(a); suite .add('xstream', function(deferred) { runners.runXStream(deferred, - xs.combine(add3, xs1, xs2, xs3).filter(even)); + xs.combine(xs1, xs2, xs3).map(add3Arr).filter(even)); }, options) .add('most', function(deferred) { runners.runMost(deferred, diff --git a/src/core.ts b/src/core.ts index 4ed1a20..22b442a 100644 --- a/src/core.ts +++ b/src/core.ts @@ -62,18 +62,6 @@ function internalizeProducer(producer: Producer) { (> ( producer))._stop = producer.stop; } -function invoke(f: Function, args: Array) { - switch (args.length) { - case 0: return f(); - case 1: return f(args[0]); - case 2: return f(args[0], args[1]); - case 3: return f(args[0], args[1], args[2]); - case 4: return f(args[0], args[1], args[2], args[3]); - case 5: return f(args[0], args[1], args[2], args[3], args[4]); - default: return f.apply(void 0, args); - } -} - function compose2(f1: (t: T) => any, f2: (t: T) => any): (t: T) => any { return function composedFn(arg: T): any { return f1(f2(arg)); @@ -86,39 +74,84 @@ function and(f1: (t: T) => boolean, f2: (t: T) => boolean): (t: T) => boolean }; } -export interface CombineProjectFunction { - (v1: T1, v2: T2): R; - (v1: T1, v2: T2, v3: T3): R; - (v1: T1, v2: T2, v3: T3, v4: T4): R; - (v1: T1, v2: T2, v3: T3, v4: T4, v5: T5): R; - (v1: T1, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6): R; - (...values: Array): R; +export class MergeProducer implements InternalProducer, InternalListener { + public type = 'merge'; + private out: InternalListener = emptyListener; + private ac: number; // ac is activeCount, starts initialized + + constructor(public streams: Array>) { + this.ac = streams.length; + } + + _start(out: InternalListener): void { + this.out = out; + const s = this.streams; + const L = s.length; + for (let i = 0; i < L; i++) { + s[i]._add(this); + } + } + + _stop(): void { + const s = this.streams; + const L = s.length; + for (let i = 0; i < L; i++) { + s[i]._remove(this); + } + this.out = null; + this.ac = L; + } + + _n(t: T) { + const u = this.out; + if (!u) return; + u._n(t); + } + + _e(err: any) { + const u = this.out; + if (!u) return; + u._e(err); + } + + _c() { + if (--this.ac === 0) { + const u = this.out; + if (!u) return; + u._c(); + } + } } -export interface CombineFactorySignature { - ( - project: (t1: T1, t2: T2) => R, - stream1: Stream, - stream2: Stream): Stream; - ( - project: (t1: T1, t2: T2, t3: T3) => R, - stream1: Stream, - stream2: Stream, - stream3: Stream): Stream; - ( - project: (t1: T1, t2: T2, t3: T3, t4: T4) => R, - stream1: Stream, - stream2: Stream, - stream3: Stream, - stream4: Stream): Stream; - ( - project: (t1: T1, t2: T2, t3: T3, t4: T4, t5: T5) => R, - stream1: Stream, - stream2: Stream, - stream3: Stream, - stream4: Stream, - stream5: Stream): Stream; - (project: (...args: Array) => R, ...streams: Array>): Stream; +export interface CombineSignature { + (): Stream>; + (s1: Stream): Stream<[T1]>; + ( + s1: Stream, + s2: Stream): Stream<[T1, T2]>; + ( + s1: Stream, + s2: Stream, + s3: Stream): Stream<[T1, T2, T3]>; + ( + s1: Stream, + s2: Stream, + s3: Stream, + s4: Stream): Stream<[T1, T2, T3, T4]>; + ( + s1: Stream, + s2: Stream, + s3: Stream, + s4: Stream, + s5: Stream): Stream<[T1, T2, T3, T4, T5]>; + ( + s1: Stream, + s2: Stream, + s3: Stream, + s4: Stream, + s5: Stream, + s6: Stream): Stream<[T1, T2, T3, T4, T5, T6]>; + (...stream: Array>): Stream>; } export class CombineListener implements InternalListener { @@ -131,11 +164,7 @@ export class CombineListener implements InternalListener { const p = this.p, out = p.out; if (!out) return; if (p.up(t, this.i)) { - try { - out._n(invoke(p.project, p.vals)); - } catch (e) { - out._e(e); - } + out._n(p.vals); } } @@ -154,16 +183,15 @@ export class CombineListener implements InternalListener { } } -export class CombineProducer implements InternalProducer { +export class CombineProducer implements InternalProducer> { public type = 'combine'; - public out: InternalListener = emptyListener; + public out: InternalListener> = emptyListener; public ils: Array> = []; public ac: number; // ac is "active count", num of streams still not completed public left: number; // number of streams that still need to emit a value - public vals: Array; + public vals: Array; - constructor(public project: CombineProjectFunction, - public streams: Array>) { + constructor(public streams: Array>) { const n = this.ac = this.left = streams.length; const vals = this.vals = new Array(n); for (let i = 0; i < n; i++) { @@ -178,11 +206,14 @@ export class CombineProducer implements InternalProducer { return left === 0; } - _start(out: InternalListener): void { + _start(out: InternalListener>): void { this.out = out; const s = this.streams; const n = s.length; - if (n === 0) this.zero(out); else { + if (n === 0) { + out._n(this.vals); + out._c(); + } else { for (let i = 0; i < n; i++) { s[i]._add(new CombineListener(i, this)); } @@ -200,15 +231,6 @@ export class CombineProducer implements InternalProducer { this.out = null; this.ils = []; } - - zero(out: InternalListener): void { - try { - out._n(this.project()); - out._c(); - } catch (e) { - out._e(e); - } - } } export class FromArrayProducer implements InternalProducer { @@ -258,55 +280,6 @@ export class FromPromiseProducer implements InternalProducer { } } -export class MergeProducer implements InternalProducer, InternalListener { - public type = 'merge'; - private out: InternalListener = emptyListener; - private ac: number; // ac is activeCount, starts initialized - - constructor(public streams: Array>) { - this.ac = streams.length; - } - - _start(out: InternalListener): void { - this.out = out; - const s = this.streams; - const L = s.length; - for (let i = 0; i < L; i++) { - s[i]._add(this); - } - } - - _stop(): void { - const s = this.streams; - const L = s.length; - for (let i = 0; i < L; i++) { - s[i]._remove(this); - } - this.out = null; - this.ac = L; - } - - _n(t: T) { - const u = this.out; - if (!u) return; - u._n(t); - } - - _e(err: any) { - const u = this.out; - if (!u) return; - u._e(err); - } - - _c() { - if (--this.ac === 0) { - const u = this.out; - if (!u) return; - u._c(); - } - } -} - export class PeriodicProducer implements InternalProducer { public type = 'periodic'; private intervalID: any = -1; @@ -1241,40 +1214,33 @@ export class Stream implements InternalListener { } /** - * Combines multiple streams together to return a stream whose events are - * calculated from the latest events of each of the input streams. + * Combines multiple input streams together to return a stream whose events + * are arrays that collect the latest events from each input stream. * - * *combine* remembers the most recent event from each of the input streams. - * When any of the input streams emits an event, that event together with all - * the other saved events are combined in the `project` function which should - * return a value. That value will be emitted on the output stream. It's - * essentially a way of mixing the events from multiple streams according to a - * formula. + * *combine* internally remembers the most recent event from each of the input + * streams. When any of the input streams emits an event, that event together + * with all the other saved events are combined into an array. That array will + * be emitted on the output stream. It's essentially a way of joining together + * the events from multiple streams. * * Marble diagram: * * ```text * --1----2-----3--------4--- * ----a-----b-----c--d------ - * combine((x,y) => x+y) + * combine * ----1a-2a-2b-3b-3c-3d-4d-- * ``` * * @factory true - * @param {Function} project A function of type `(x: T1, y: T2) => R` or - * similar that takes the most recent events `x` and `y` from the input - * streams and returns a value. The output stream will emit that value. The - * number of arguments for this function should match the number of input - * streams. * @param {Stream} stream1 A stream to combine together with other streams. * @param {Stream} stream2 A stream to combine together with other streams. - * Two or more streams may be given as arguments. + * Multiple streams, not just two, may be given as arguments. * @return {Stream} */ - static combine: CombineFactorySignature = - function combine(project: CombineProjectFunction, - ...streams: Array>): Stream { - return new Stream(new CombineProducer(project, streams)); + static combine: CombineSignature = + function combine(...streams: Array>): Stream> { + return new Stream>(new CombineProducer(streams)); }; protected _map(project: (t: T) => U): Stream | MemoryStream { diff --git a/tests/factory/combine.ts b/tests/factory/combine.ts index d1b6b8a..c0ac7d9 100644 --- a/tests/factory/combine.ts +++ b/tests/factory/combine.ts @@ -7,11 +7,13 @@ describe('xs.combine', () => { it('should combine AND-style two streams together', (done) => { const stream1 = xs.periodic(100).take(2); const stream2 = xs.periodic(120).take(2); - const stream = xs.combine((x, y) => `${x}${y}`, stream1, stream2); - let expected = ['00', '10', '11']; + const stream = xs.combine(stream1, stream2); + let expected = [[0,0], [1,0], [1,1]]; stream.addListener({ next: (x) => { - assert.equal(x, expected.shift()); + const e = expected.shift(); + assert.equal(x[0], e[0]); + assert.equal(x[1], e[1]); }, error: done, complete: () => { @@ -32,17 +34,14 @@ describe('xs.combine', () => { stop: () => {} }); - const combined: Stream = xs.combine( - (a, b) => a.slice(2) + b.slice(2), - stream1, stream2 - ); + const combined: Stream<[string, string]> = xs.combine(stream1, stream2); done(); }); it('should complete only when all member streams have completed', (done) => { const stream1 = xs.periodic(30).take(1); const stream2 = xs.periodic(50).take(4); - const stream = xs.combine((x, y) => `${x}${y}`, stream1, stream2); + const stream = xs.combine(stream1, stream2).map(arr => arr.join('')) let expected = ['00', '01', '02', '03']; stream.addListener({ next: (x) => { @@ -56,36 +55,16 @@ describe('xs.combine', () => { }); }); - it('should propagate user mistakes in project as errors', (done) => { - const stream1 = xs.periodic(30).take(1); - const stream2 = xs.periodic(50).take(4); - const stream = xs.combine( - (x, y) => ( x).toLowerCase(), - stream1, stream2 - ); - stream.addListener({ - next: () => done('next should not be called'), - error: (err) => { - assert.notStrictEqual(err.message.match(/is not a function$/), null); - done(); - }, - complete: () => { - done('complete should not be called'); - }, - }); - }); - - it('should handle a group of zero streams', (done) => { - const stream = xs.combine(() => 'hi'); - let expected = ['hi']; + it('should emit an empty array if combining zero streams', (done) => { + const stream = xs.combine(); stream.addListener({ - next: (x) => { - assert.equal(x, expected.shift()); + next: (a) => { + assert.equal(Array.isArray(a), true); + assert.equal(a.length, 0); }, error: done, complete: () => { - assert.equal(expected.length, 0); done(); }, }); @@ -101,10 +80,8 @@ describe('xs.combine', () => { const arrayInners: Array> = []; const stream = outer .map(x => { - return xs.combine( - (...args: Array) => '' + x + args.join(''), - ...arrayInners - ); + return xs.combine(...arrayInners) + .map(combination => `${x}${combination.join('')}`); }) .flatten(); const expected = ['00']; @@ -150,7 +127,7 @@ describe('xs.combine', () => { it('should return a Stream when combining a MemoryStream with a Stream', (done) => { const input1 = xs.periodic(50).take(4).remember(); const input2 = xs.periodic(80).take(3); - const stream: Stream = xs.combine((x, y) => x + y, input1, input2); + const stream: Stream<[number, number]> = xs.combine(input1, input2); assert.strictEqual(stream instanceof Stream, true); done(); }); @@ -158,7 +135,7 @@ describe('xs.combine', () => { it('should return a Stream when combining a MemoryStream with a MemoryStream', (done) => { const input1 = xs.periodic(50).take(4).remember(); const input2 = xs.periodic(80).take(3).remember(); - const stream: Stream = xs.combine((x, y) => x + y, input1, input2); + const stream: Stream<[number, number]> = xs.combine(input1, input2); assert.strictEqual(stream instanceof Stream, true); done(); });