Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operator): Add repeatWhen operator #1911

Merged
merged 1 commit into from Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion doc/decision-tree-widget/tree.yml
Expand Up @@ -286,7 +286,12 @@ children:
children:
- label: I want to re-subscribe
children:
- label: repeat
- label: immediately
children:
- label: repeat
- label: when another Observable emits
children:
- label: repeatWhen
- label: I want to start a new Observable
children:
- label: concat
Expand Down
1 change: 1 addition & 0 deletions doc/operators.md
Expand Up @@ -125,6 +125,7 @@ There are operators for different purposes, and they may be categorized as: crea
- [`never`](../class/es6/Observable.js~Observable.html#static-method-never)
- [`of`](../class/es6/Observable.js~Observable.html#static-method-of)
- `repeat`
- `repeatWhen`
- [`range`](../class/es6/Observable.js~Observable.html#static-method-range)
- [`throw`](../class/es6/Observable.js~Observable.html#static-method-throw)
- [`timer`](../class/es6/Observable.js~Observable.html#static-method-timer)
Expand Down
323 changes: 323 additions & 0 deletions spec/operators/repeatWhen-spec.ts
@@ -0,0 +1,323 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;

/** @test {repeatWhen} */
describe('Observable.prototype.repeatWhen', () => {
asDiagram('repeatWhen')('should handle a source with eventual complete using a hot notifier', () => {
const source = cold('-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const notifier = hot('-------------r------------r-|');
const expected = '-1--2---------1--2---------1|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a source with eventual complete using a hot notifier that raises error', () => {
const source = cold( '-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ ! '];
const notifier = hot('-----------r-------r---------#');
const expected = '-1--2-------1--2----1--2-----#';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should retry when notified via returned notifier on complete', (done: MochaDone) => {
let retried = false;
const expected = [1, 2, 1, 2];
let i = 0;
Observable.of(1, 2)
.map((n: number) => {
return n;
})
.repeatWhen((notifications: any) => notifications.map((x: any) => {
if (retried) {
throw new Error('done');
}
retried = true;
return x;
}))
.subscribe((x: any) => {
expect(x).to.equal(expected[i++]);
},
(err: any) => {
expect(err).to.be.an('error', 'done');
done();
});
});

it('should retry when notified and complete on returned completion', (done: MochaDone) => {
const expected = [1, 2, 1, 2];
Observable.of(1, 2)
.map((n: number) => {
return n;
})
.repeatWhen((notifications: any) => Observable.empty())
.subscribe((n: number) => {
expect(n).to.equal(expected.shift());
}, (err: any) => {
done(new Error('should not be called'));
}, () => {
done();
});
});

it('should apply an empty notifier on an empty source', () => {
const source = cold( '|');
const subs = '(^!)';
const notifier = cold('|');
const expected = '|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should apply a never notifier on an empty source', () => {
const source = cold( '|');
const subs = '(^!)';
const notifier = cold('-');
const expected = '-';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should apply an empty notifier on a never source', () => {
const source = cold( '-');
const unsub = ' !';
const subs = '^ !';
const notifier = cold('|');
const expected = '-';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should apply a never notifier on a never source', () => {
const source = cold( '-');
const unsub = ' !';
const subs = '^ !';
const notifier = cold('-');
const expected = '-';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should return an empty observable given a just-throw source and empty notifier', () => {
const source = cold( '#');
const notifier = cold('|');
const expected = '#';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
});

it('should return a error observable given a just-throw source and never notifier', () => {
const source = cold( '#');
const notifier = cold('-');
const expected = '#';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
});

xit('should hide errors using a never notifier on a source with eventual error', () => {
const source = cold( '--a--b--c--#');
const subs = '^ !';
const notifier = cold( '-');
const expected = '--a--b--c---------------------------------';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

xit('should propagate error thrown from notifierSelector function', () => {
const source = cold('--a--b--c--|');
const subs = '^ !';
const expected = '--a--b--c--#';

const result = source.repeatWhen(<any>(() => { throw 'bad!'; }));

expectObservable(result).toBe(expected, undefined, 'bad!');
expectSubscriptions(source.subscriptions).toBe(subs);
});

xit('should replace error with complete using an empty notifier on a source ' +
'with eventual error', () => {
const source = cold( '--a--b--c--#');
const subs = '^ !';
const notifier = cold( '|');
const expected = '--a--b--c--|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should mirror a basic cold source with complete, given a never notifier', () => {
const source = cold( '--a--b--c--|');
const subs = '^ !';
const notifier = cold( '|');
const expected = '--a--b--c--|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should mirror a basic cold source with no termination, given a never notifier', () => {
const source = cold( '--a--b--c---');
const subs = '^ ';
const notifier = cold( '|');
const expected = '--a--b--c---';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should mirror a basic hot source with complete, given a never notifier', () => {
const source = hot('-a-^--b--c--|');
const subs = '^ !';
const notifier = cold( '|');
const expected = '---b--c--|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

xit('should handle a hot source that raises error but eventually completes', () => {
const source = hot('-1--2--3----4--5---|');
const ssubs = ['^ ! ',
' ^ !'];
const notifier = hot('--------------r--------r---r--r--r---|');
const nsubs = ' ^ !';
const expected = '-1--2--- -5---|';

const result = source
.map((x: string) => {
if (x === '3') {
throw 'error';
}
return x;
}).repeatWhen(() => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(ssubs);
expectSubscriptions(notifier.subscriptions).toBe(nsubs);
});

it('should tear down resources when result is unsubscribed early', () => {
const source = cold( '-1--2--|');
const unsub = ' ! ';
const subs = ['^ ! ',
' ^ ! ',
' ^ ! '];
const notifier = hot('---------r-------r---------#');
const nsubs = ' ^ ! ';
const expected = '-1--2-----1--2----1-- ';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(notifier.subscriptions).toBe(nsubs);
});

it('should not break unsubscription chains when unsubscribed explicitly', () => {
const source = cold( '-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ ! '];
const notifier = hot('---------r-------r-------r-#');
const nsubs = ' ^ ! ';
const expected = '-1--2-----1--2----1-- ';
const unsub = ' ! ';

const result = source
.mergeMap((x: string) => Observable.of(x))
.repeatWhen((notifications: any) => notifier)
.mergeMap((x: string) => Observable.of(x));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(notifier.subscriptions).toBe(nsubs);
});

it('should handle a source with eventual error using a dynamic notifier ' +
'selector which eventually throws', () => {
const source = cold('-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-1--2---1--2---1--2--#';

let invoked = 0;
const result = source.repeatWhen((notifications: any) =>
notifications.map((err: any) => {
if (++invoked === 3) {
throw 'error';
} else {
return 'x';
}
}));

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a source with eventual error using a dynamic notifier ' +
'selector which eventually completes', () => {
const source = cold('-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-1--2---1--2---1--2--|';

let invoked = 0;
const result = source.repeatWhen((notifications: any) => notifications
.map(() => 'x')
.takeUntil(
notifications.flatMap(() => {
if (++invoked < 3) {
return Observable.empty();
} else {
return Observable.of('stop!');
}
})
));

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
1 change: 1 addition & 0 deletions src/Rx.ts
Expand Up @@ -104,6 +104,7 @@ import './add/operator/publishLast';
import './add/operator/race';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/repeatWhen';
import './add/operator/retry';
import './add/operator/retryWhen';
import './add/operator/sample';
Expand Down
11 changes: 11 additions & 0 deletions src/add/operator/repeatWhen.ts
@@ -0,0 +1,11 @@

import { Observable } from '../../Observable';
import { repeatWhen, RepeatWhenSignature } from '../../operator/repeatWhen';

Observable.prototype.repeatWhen = repeatWhen;

declare module '../../Observable' {
interface Observable<T> {
repeatWhen: RepeatWhenSignature<T>;
}
}