Skip to content

Commit

Permalink
fix(sampleCombine): change API to fit compose() usage
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed Sep 25, 2016
1 parent cff94da commit 38782d8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 38 deletions.
79 changes: 64 additions & 15 deletions src/extra/sampleCombine.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,52 @@
import {CombineSignature, InternalListener, Operator, Stream} from '../core';
import {InternalListener, Operator, Stream} from '../core';

export interface SampleCombineSignature {
(): <T>(s: Stream<T>) => Stream<[T]>;
<T1>(s1: Stream<T1>): <T>(s: Stream<T>) => Stream<[T, T1]>;
<T1, T2>(
s1: Stream<T1>,
s2: Stream<T2>): <T>(s: Stream<T>) => Stream<[T, T1, T2]>;
<T1, T2, T3>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3]>;
<T1, T2, T3, T4>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4]>;
<T1, T2, T3, T4, T5>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>,
s5: Stream<T5>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5]>;
<T1, T2, T3, T4, T5, T6>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>,
s5: Stream<T5>,
s6: Stream<T6>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5, T6]>;
<T1, T2, T3, T4, T5, T6, T7>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>,
s5: Stream<T5>,
s6: Stream<T6>,
s7: Stream<T7>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5, T6, T7]>;
<T1, T2, T3, T4, T5, T6, T7, T8>(
s1: Stream<T1>,
s2: Stream<T2>,
s3: Stream<T3>,
s4: Stream<T4>,
s5: Stream<T5>,
s6: Stream<T6>,
s7: Stream<T7>,
s8: Stream<T8>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5, T6, T7, T8]>;
(...streams: Array<Stream<any>>): (s: Stream<any>) => Stream<Array<any>>;
}

export class SampleCombineListener<T> implements InternalListener<T> {
constructor(private i: number, private p: SampleCombineOperator<any>) {
Expand Down Expand Up @@ -75,6 +123,13 @@ export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
}
}

let sampleCombine: SampleCombineSignature;
sampleCombine = function sampleCombine(...streams: Array<Stream<any>>) {
return function sampleCombineOperator(sampler: Stream<any>): Stream<Array<any>> {
return new Stream<Array<any>>(new SampleCombineOperator(sampler, streams));
};
} as SampleCombineSignature;

/**
* Combines a source stream with multiple other streams. The result stream
* will emit the latest events from all input streams, but only when the
Expand All @@ -90,10 +145,10 @@ export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
* Marble diagram:
*
* ```text
* --1----2-----3--------4---
* ----a-----b-----c--d------
* --1----2-----3--------4--- (source)
* ----a-----b-----c--d------ (other)
* sampleCombine
* --1?---2a----3b-------4d--
* -------2a----3b-------4d--
* ```
*
* Examples:
Expand All @@ -105,7 +160,7 @@ export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
* const sampler = xs.periodic(1000).take(3)
* const other = xs.periodic(100)
*
* const stream = sampleCombine(sampler, other)
* const stream = sampler.compose(sampleCombine(other))
*
* stream.addListener({
* next: i => console.log(i),
Expand All @@ -127,7 +182,7 @@ export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
* const sampler = xs.periodic(1000).take(3)
* const other = xs.periodic(100).take(2)
*
* const stream = sampleCombine(sampler, other)
* const stream = sampler.compose(sampleCombine(other))
*
* stream.addListener({
* next: i => console.log(i),
Expand All @@ -142,14 +197,8 @@ export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
* > [2, 1]
* ```
*
* @param {Stream} sampler The source stream of which to sample.
* @param {...Stream} streams One or more streams to combine.
* @param {...Stream} streams One or more streams to combine with the sampler
* stream.
* @return {Stream}
*/
let sampleCombine: CombineSignature;
sampleCombine = function(sampler: Stream<any>,
...streams: Array<Stream<any>>): Stream<Array<any>> {
return new Stream<Array<any>>(new SampleCombineOperator<any>(sampler, streams));
} as CombineSignature;

export default sampleCombine;
export default sampleCombine;
34 changes: 11 additions & 23 deletions tests/extra/sampleCombine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import * as assert from 'assert';

describe('sampleCombine (extra)', () => {
it('should combine AND-style two streams together', (done) => {
const stream1 = xs.periodic(100).take(3);
const stream1 = xs.periodic(100).take(3).startWith(null);
const stream2 = xs.periodic(99).take(3);
const stream = sampleCombine(stream1, stream2);
const stream = stream1.compose(sampleCombine(stream2));
let expected = [[0, 0], [1, 1], [2, 2]];
stream.addListener({
next: (x) => {
Expand All @@ -35,14 +35,15 @@ describe('sampleCombine (extra)', () => {
stop: () => {}
});

const combined: Stream<[string, string]> = sampleCombine(stream1, stream2);
const combined: Stream<[string, string]> = stream1
.compose(sampleCombine(stream2));
done();
});

it('should complete only when the sample stream has completed', (done) => {
const stream1 = xs.periodic(100).take(4);
const stream2 = xs.periodic(99).take(1);
const stream = sampleCombine(stream1, stream2).map(arr => arr.join(''));
const stream = stream1.compose(sampleCombine(stream2)).map(arr => arr.join(''));
let expected = ['00', '10', '20', '30'];
stream.addListener({
next: (x) => {
Expand All @@ -56,24 +57,9 @@ describe('sampleCombine (extra)', () => {
});
});

it('should emit an empty array if combining zero streams', (done) => {
const stream = sampleCombine();

stream.addListener({
next: (a) => {
assert.equal(Array.isArray(a), true);
assert.equal(a.length, 0);
},
error: done,
complete: () => {
done();
},
});
});

it('should just wrap the value if combining one stream', (done) => {
const source = xs.periodic(100).take(3);
const stream = sampleCombine(source);
const stream = source.compose(sampleCombine());
let expected = [[0], [1], [2]];

stream.addListener({
Expand All @@ -100,7 +86,9 @@ describe('sampleCombine (extra)', () => {
const arrayInners: Array<Stream<number>> = [];
const stream = outer
.map(x => {
return sampleCombine(...arrayInners)
const sampler = arrayInners[0];
const others = arrayInners.slice(1, 1000);
return sampler.compose(sampleCombine(...others))
.map(combination => `${x}${combination.join('')}`);
})
.flatten();
Expand Down Expand Up @@ -147,15 +135,15 @@ describe('sampleCombine (extra)', () => {
it('should return a Stream when combining a MemoryStream with a Stream', (done) => {
const input1 = xs.periodic(80).take(4).remember();
const input2 = xs.periodic(50).take(3);
const stream: Stream<[number, number]> = sampleCombine(input1, input2);
const stream: Stream<[number, number]> = input1.compose(sampleCombine(input2));
assert.strictEqual(stream instanceof Stream, true);
done();
});

it('should return a Stream when combining a MemoryStream with a MemoryStream', (done) => {
const input1 = xs.periodic(80).take(4).remember();
const input2 = xs.periodic(50).take(3).remember();
const stream: Stream<[number, number]> = sampleCombine(input1, input2);
const stream: Stream<[number, number]> = input1.compose(sampleCombine(input2));
assert.strictEqual(stream instanceof Stream, true);
done();
});
Expand Down

0 comments on commit 38782d8

Please sign in to comment.