From 65205500e797d0b94e28785ab3cf1b400d9930a3 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 13 May 2016 16:04:31 +0300 Subject: [PATCH] fix(flatten): fix map+flatten fusion to respect filter+map fusion Fix flatten() and flattenConcurrently() map fusion to not attempt fusion if the producer was FilterMapOperator, because that would hijack the project function while ignoring the passes function. --- src/core.ts | 4 +-- tests/operator/flatten.ts | 37 ++++++++++++++++++++++++++- tests/operator/flattenConcurrently.ts | 35 +++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/src/core.ts b/src/core.ts index 2545a08..f9e0306 100644 --- a/src/core.ts +++ b/src/core.ts @@ -1690,7 +1690,7 @@ export class Stream implements InternalListener { flatten(): T { const p = this._prod; return new Stream( - p instanceof MapOperator || p instanceof FilterMapOperator ? + p instanceof MapOperator && !(p instanceof FilterMapOperator) ? new MapFlattenOperator(>> p) : new FlattenOperator(>> this) ); @@ -1724,7 +1724,7 @@ export class Stream implements InternalListener { flattenConcurrently(): T { const p = this._prod; return new Stream( - p instanceof MapOperator || p instanceof FilterMapOperator ? + p instanceof MapOperator && !(p instanceof FilterMapOperator) ? new MapFlattenConcOperator(>> p) : new FlattenConcOperator(>> this) ); diff --git a/tests/operator/flatten.ts b/tests/operator/flatten.ts index dd750f0..6c52235 100644 --- a/tests/operator/flatten.ts +++ b/tests/operator/flatten.ts @@ -139,7 +139,42 @@ describe('Stream.prototype.flatten', () => { }); }); }); - + + describe('with filter+map fusion', () => { + it('should execute the predicate, the projection, and the flattening', (done) => { + let predicateCallCount = 0; + let projectCallCount = 0; + + const stream = xs.periodic(140).take(3) + .filter(i => { + predicateCallCount += 1; + return i % 2 === 0; + }) + .map(i => { + projectCallCount += 1; + return xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`); + }) + .flatten(); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ------------20-----------21----------22 + const expected = ['00', '01', '20', '21', '22']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + assert.equal(predicateCallCount, 3); + assert.equal(projectCallCount, 2); + done(); + } + }); + }); + }); + describe('with mapTo', () => { it('should have the correct \'type\' metadata on the operator producer', (done) => { const source: Stream> = xs.periodic(100).take(3) diff --git a/tests/operator/flattenConcurrently.ts b/tests/operator/flattenConcurrently.ts index f68234e..f4982fd 100644 --- a/tests/operator/flattenConcurrently.ts +++ b/tests/operator/flattenConcurrently.ts @@ -141,6 +141,41 @@ describe('Stream.prototype.flattenConcurrently', () => { }); }); + describe('with filter+map fusion', () => { + it('should execute the predicate, the projection, and the flattening', (done) => { + let predicateCallCount = 0; + let projectCallCount = 0; + + const stream = xs.periodic(140).take(3) + .filter(i => { + predicateCallCount += 1; + return i % 2 === 0; + }) + .map(i => { + projectCallCount += 1; + return xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`); + }) + .flattenConcurrently(); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ------------20-----------21----------22 + const expected = ['00', '01', '02', '20', '21', '22']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + assert.equal(predicateCallCount, 3); + assert.equal(projectCallCount, 2); + done(); + } + }); + }); + }); + describe('with mapTo', () => { it('should have the correct \'type\' metadata on the operator producer', (done) => { const source: Stream> = xs.periodic(100).take(3)