Skip to content

Commit

Permalink
implement Source.filterAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
srijs committed May 24, 2016
1 parent 83bddb9 commit e376ffd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/source-spec.ts
Expand Up @@ -161,6 +161,20 @@ describe('Stream', () => {

});

describe('filterAsync', () => {

it('produces an empty source from an empty source', () => {
const src = Source.empty();
return chai.expect(src.filterAsync(() => Promise.resolve(true)).toArray()).to.eventually.deep.equal([]);
});

it('only produces elements for which the predicate returns true', () => {
const src = Source.fromArray([1,2,3,4,5,6]);
return chai.expect(src.filterAsync(x => Promise.resolve(x % 2 === 0)).toArray()).to.eventually.deep.equal([2,4,6]);
});

});

describe('intoOutputStream', () => {

it('writes no data when the source is empty', () => {
Expand Down
15 changes: 15 additions & 0 deletions src/source.ts
Expand Up @@ -120,6 +120,21 @@ export class Source<Output> {
});
}

filterAsync(pred: (output: Output) => Promise<boolean>): Source<Output> {
return new Source(<State, Result>(sink: SinkInterface<Output, State, Result>) => {
return this.pipe<State, Result>({
onStart: () => sink.onStart(),
onData: (state, output) => pred(output).then(res => {
if (!res) {
return Promise.resolve(state);
}
return sink.onData(state, output);
}),
onEnd: (state) => sink.onEnd(state)
});
});
}

toArray(): Promise<Array<Output>> {
return this.fold([], (arr, outp) => arr.concat([outp]));
}
Expand Down

0 comments on commit e376ffd

Please sign in to comment.