Skip to content

Commit

Permalink
fix(flatten): fix map+flatten fusion to respect filter+map fusion
Browse files Browse the repository at this point in the history
Fix flatten() and flattenConcurrently() map fusion to not attempt fusion if the producer was
FilterMapOperator, because that would hijack the project function while ignoring the passes
function.
  • Loading branch information
staltz committed May 13, 2016
1 parent 0aac03d commit 6520550
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ export class Stream<T> implements InternalListener<T> {
flatten<R>(): T {
const p = this._prod;
return <T> <any> new Stream<R>(
p instanceof MapOperator || p instanceof FilterMapOperator ?
p instanceof MapOperator && !(p instanceof FilterMapOperator) ?
new MapFlattenOperator(<MapOperator<any, Stream<R>>> <any> p) :
new FlattenOperator(<Stream<Stream<R>>> <any> this)
);
Expand Down Expand Up @@ -1724,7 +1724,7 @@ export class Stream<T> implements InternalListener<T> {
flattenConcurrently<R>(): T {
const p = this._prod;
return <T> <any> new Stream<R>(
p instanceof MapOperator || p instanceof FilterMapOperator ?
p instanceof MapOperator && !(p instanceof FilterMapOperator) ?
new MapFlattenConcOperator(<MapOperator<any, Stream<R>>> <any> p) :
new FlattenConcOperator(<Stream<Stream<R>>> <any> this)
);
Expand Down
37 changes: 36 additions & 1 deletion tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,42 @@ describe('Stream.prototype.flatten', () => {
});
});
});


describe('with filter+map fusion', () => {
it('should execute the predicate, the projection, and the flattening', (done) => {
let predicateCallCount = 0;
let projectCallCount = 0;

const stream = xs.periodic(140).take(3)
.filter(i => {
predicateCallCount += 1;
return i % 2 === 0;
})
.map(i => {
projectCallCount += 1;
return xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`);
})
.flatten();
// ---x---x---x---x---x---x---x---x---x---x---x---x
// ---00--01--02
// ------------20-----------21----------22
const expected = ['00', '01', '20', '21', '22'];

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
assert.equal(predicateCallCount, 3);
assert.equal(projectCallCount, 2);
done();
}
});
});
});

describe('with mapTo', () => {
it('should have the correct \'type\' metadata on the operator producer', (done) => {
const source: Stream<Stream<number>> = xs.periodic(100).take(3)
Expand Down
35 changes: 35 additions & 0 deletions tests/operator/flattenConcurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ describe('Stream.prototype.flattenConcurrently', () => {
});
});

describe('with filter+map fusion', () => {
it('should execute the predicate, the projection, and the flattening', (done) => {
let predicateCallCount = 0;
let projectCallCount = 0;

const stream = xs.periodic(140).take(3)
.filter(i => {
predicateCallCount += 1;
return i % 2 === 0;
})
.map(i => {
projectCallCount += 1;
return xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`);
})
.flattenConcurrently();
// ---x---x---x---x---x---x---x---x---x---x---x---x
// ---00--01--02
// ------------20-----------21----------22
const expected = ['00', '01', '02', '20', '21', '22'];

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
assert.equal(predicateCallCount, 3);
assert.equal(projectCallCount, 2);
done();
}
});
});
});

describe('with mapTo', () => {
it('should have the correct \'type\' metadata on the operator producer', (done) => {
const source: Stream<Stream<number>> = xs.periodic(100).take(3)
Expand Down

0 comments on commit 6520550

Please sign in to comment.