Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(delayWhen): delayWhen's delayDurationSelector should support ObservableInput #7049

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions spec-dtslint/operators/delayWhen-spec.ts
Expand Up @@ -10,20 +10,24 @@ it('should support an empty notifier', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => NEVER)); // $ExpectType Observable<number>
});

it('should support a subscriptiondelayWhen parameter', () => {
it('should support a subscriptionDelay parameter', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => of('a', 'b', 'c'), of(new Date()))); // $ExpectType Observable<number>
});

it('should enforce types', () => {
const o = of(1, 2, 3).pipe(delayWhen()); // $ExpectError
});

it('should enforce types of delayWhenDurationSelector', () => {
it('should enforce types of delayDurationSelector', () => {
const o = of(1, 2, 3).pipe(delayWhen(of('a', 'b', 'c'))); // $ExpectError
const p = of(1, 2, 3).pipe(delayWhen((value: string, index) => of('a', 'b', 'c'))); // $ExpectError
const q = of(1, 2, 3).pipe(delayWhen((value, index: string) => of('a', 'b', 'c'))); // $ExpectError
});

it('should enforce types of subscriptiondelayWhen', () => {
it('should enforce types of subscriptionDelay', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => of('a', 'b', 'c'), 'a')); // $ExpectError
});

it('should support Promises', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => Promise.resolve('a'))); // $ExpectType Observable<number>
});
4 changes: 2 additions & 2 deletions spec/operators/audit-spec.ts
Expand Up @@ -395,7 +395,7 @@ describe('audit operator', () => {
});

it('should audit by promise resolves', (done) => {
const e1 = interval(10).pipe(take(5));
const e1 = interval(1).pipe(take(5));
const expected = [0, 1, 2, 3, 4];

e1.pipe(audit(() => Promise.resolve(42))).subscribe({
Expand All @@ -413,7 +413,7 @@ describe('audit operator', () => {
});

it('should raise error when promise rejects', (done) => {
const e1 = interval(10).pipe(take(10));
const e1 = interval(1).pipe(take(10));
const expected = [0, 1, 2];
const error = new Error('error');

Expand Down
40 changes: 39 additions & 1 deletion spec/operators/delayWhen-spec.ts
@@ -1,4 +1,4 @@
import { of, EMPTY } from 'rxjs';
import { of, EMPTY, interval, take } from 'rxjs';
import { delayWhen, tap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
Expand Down Expand Up @@ -338,4 +338,42 @@ describe('delayWhen', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should delayWhen Promise resolves', (done) => {
const e1 = interval(1).pipe(take(5));
const expected = [0, 1, 2, 3, 4];

e1.pipe(delayWhen(() => Promise.resolve(42))).subscribe({
next: (x: number) => {
expect(x).to.equal(expected.shift());
},
error: () => {
done(new Error('should not be called'));
},
complete: () => {
expect(expected.length).to.equal(0);
done();
},
});
});

it('should raise error when Promise rejects', (done) => {
const e1 = interval(1).pipe(take(10));
const expected = [0, 1, 2];
const error = new Error('err');

e1.pipe(delayWhen((x) => (x === 3 ? Promise.reject(error) : Promise.resolve(42)))).subscribe({
next: (x: number) => {
expect(x).to.equal(expected.shift());
},
error: (err: any) => {
expect(err).to.be.an('error');
expect(expected.length).to.equal(0);
done();
},
complete: () => {
done(new Error('should not be called'));
},
});
});
});
29 changes: 16 additions & 13 deletions src/internal/operators/delayWhen.ts
@@ -1,17 +1,18 @@
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { concat } from '../observable/concat';
import { take } from './take';
import { ignoreElements } from './ignoreElements';
import { mapTo } from './mapTo';
import { mergeMap } from './mergeMap';
import { innerFrom } from '../observable/innerFrom';

/** @deprecated The `subscriptionDelay` parameter will be removed in v8. */
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>,
delayDurationSelector: (value: T, index: number) => ObservableInput<any>,
subscriptionDelay: Observable<any>
): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>): MonoTypeOperatorFunction<T>;

/**
* Delays the emission of items from the source Observable by a given time span
Expand All @@ -26,8 +27,9 @@ export function delayWhen<T>(delayDurationSelector: (value: T, index: number) =>
* a time span determined by another Observable. When the source emits a value,
* the `delayDurationSelector` function is called with the value emitted from
* the source Observable as the first argument to the `delayDurationSelector`.
* The `delayDurationSelector` function should return an Observable, called
* the "duration" Observable.
* The `delayDurationSelector` function should return an {@link ObservableInput},
* that is internally converted to an Observable that is called the "duration"
* Observable.
*
* The source value is emitted on the output Observable only when the "duration"
* Observable emits ({@link guide/glossary-and-semantics#next next}s) any value.
Expand Down Expand Up @@ -76,18 +78,19 @@ export function delayWhen<T>(delayDurationSelector: (value: T, index: number) =>
* @see {@link audit}
* @see {@link auditTime}
*
* @param {function(value: T, index: number): Observable} delayDurationSelector A function that
* returns an Observable for each value emitted by the source Observable, which
* is then used to delay the emission of that item on the output Observable
* until the Observable returned from this function emits a value.
* @param {Observable} subscriptionDelay An Observable that triggers the
* subscription to the source Observable once it emits any value.
* @param delayDurationSelector A function that returns an `ObservableInput` for
* each `value` emitted by the source Observable, which is then used to delay the
* emission of that `value` on the output Observable until the `ObservableInput`
* returned from this function emits a next value. When called, beside `value`,
* this function receives a zero-based `index` of the emission order.
* @param subscriptionDelay An Observable that triggers the subscription to the
* source Observable once it emits any value.
* @return A function that returns an Observable that delays the emissions of
* the source Observable by an amount of time specified by the Observable
* returned by `delayDurationSelector`.
*/
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>,
delayDurationSelector: (value: T, index: number) => ObservableInput<any>,
subscriptionDelay?: Observable<any>
): MonoTypeOperatorFunction<T> {
if (subscriptionDelay) {
Expand All @@ -96,5 +99,5 @@ export function delayWhen<T>(
concat(subscriptionDelay.pipe(take(1), ignoreElements()), source.pipe(delayWhen(delayDurationSelector)));
}

return mergeMap((value, index) => delayDurationSelector(value, index).pipe(take(1), mapTo(value)));
return mergeMap((value, index) => innerFrom(delayDurationSelector(value, index)).pipe(take(1), mapTo(value)));
}