Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix teardown issues with synchronous inner-observables #4037

Merged
29 changes: 27 additions & 2 deletions spec/operators/catch-spec.ts
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { concat, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
Expand Down Expand Up @@ -121,6 +121,31 @@ describe('catchError operator', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

throwError(new Error('Some error')).pipe(
catchError(() => synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should catch error and replace it with a hot Observable', () => {
const e1 = hot('--a--b--# ');
const e1subs = '^ ! ';
Expand Down
29 changes: 27 additions & 2 deletions spec/operators/exhaustMap-spec.ts
@@ -1,6 +1,6 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Observable, of, from } from 'rxjs';
import { exhaustMap, mergeMap } from 'rxjs/operators';
import { concat, defer, Observable, of, from } from 'rxjs';
import { exhaustMap, mergeMap, takeWhile } from 'rxjs/operators';
import { expect } from 'chai';

declare function asDiagram(arg: string): Function;
Expand Down Expand Up @@ -202,6 +202,31 @@ describe('exhaustMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

of(null).pipe(
exhaustMap(() => synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should switch inner cold observables, inner never completes', () => {
const x = cold( '--a--b--c--| ');
const xsubs = ' ^ ! ';
Expand Down
30 changes: 28 additions & 2 deletions spec/operators/mergeScan-spec.ts
@@ -1,7 +1,8 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { TestScheduler } from 'rxjs/testing';
import { of, EMPTY, NEVER, concat, throwError } from 'rxjs';
import { mergeScan, delay, mergeMap } from 'rxjs/operators';
import { of, defer, EMPTY, NEVER, concat, throwError } from 'rxjs';
import { mergeScan, delay, mergeMap, takeWhile } from 'rxjs/operators';
import { expect } from 'chai';

declare const rxTestScheduler: TestScheduler;
/** @test {mergeScan} */
Expand Down Expand Up @@ -136,6 +137,31 @@ describe('mergeScan', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

of(null).pipe(
mergeScan(() => synchronousObservable, 0),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should handle errors in the projection function', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
Expand Down
29 changes: 27 additions & 2 deletions spec/operators/onErrorResumeNext-spec.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { onErrorResumeNext } from 'rxjs/operators';
import { concat, throwError, of } from 'rxjs';
import { onErrorResumeNext, takeWhile } from 'rxjs/operators';
import { concat, defer, throwError, of } from 'rxjs';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -104,6 +104,31 @@ describe('onErrorResumeNext operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

throwError(new Error('Some error')).pipe(
onErrorResumeNext(synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should work with promise', (done: MochaDone) => {
const expected = [1, 2];
const source = concat(of(1), throwError('meh'));
Expand Down
23 changes: 22 additions & 1 deletion spec/operators/skipUntil-spec.ts
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Observable, of, Subject } from 'rxjs';
import { concat, defer, Observable, of, Subject } from 'rxjs';
import { skipUntil, mergeMap } from 'rxjs/operators';

declare function asDiagram(arg: string): Function;
Expand Down Expand Up @@ -246,4 +246,25 @@ describe('skipUntil', () => {
expectObservable(result).toBe(expected);
expectSubscriptions(notifier.subscriptions).toBe(nSubs);
});

it('should stop listening to a synchronous notifier after its first nexted value', () => {
// const source = hot('-^-o---o---o---o---o---o---|');
const sideEffects: number[] = [];
const synchronousNotifer = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);
of(null).pipe(skipUntil(synchronousNotifer)).subscribe(() => { /* noop */ });
expect(sideEffects).to.deep.equal([1]);
});
});
29 changes: 27 additions & 2 deletions spec/operators/switchMap-spec.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { switchMap, mergeMap, map } from 'rxjs/operators';
import { of, Observable } from 'rxjs';
import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators';
import { concat, defer, of, Observable } from 'rxjs';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -169,6 +169,31 @@ describe('switchMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

of(null).pipe(
switchMap(() => synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should switch inner cold observables, inner never completes', () => {
const x = cold( '--a--b--c--d--e--| ');
const xsubs = ' ^ ! ';
Expand Down
5 changes: 4 additions & 1 deletion src/internal/operators/catchError.ts
Expand Up @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';

import {OuterSubscriber} from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';
import {ObservableInput, OperatorFunction, MonoTypeOperatorFunction} from '../types';

Expand Down Expand Up @@ -121,7 +122,9 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
return;
}
this._unsubscribeAndRecycle();
this.add(subscribeToResult(this, result));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
}
}
}
17 changes: 12 additions & 5 deletions src/internal/operators/exhaustMap.ts
Expand Up @@ -106,15 +106,22 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private tryNext(value: T): void {
let result: ObservableInput<R>;
const index = this.index++;
const destination = this.destination;
try {
const result = this.project(value, index);
this.hasSubscription = true;
this.add(subscribeToResult(this, result, value, index));
result = this.project(value, index);
} catch (err) {
destination.error(err);
this.destination.error(err);
return;
}
this.hasSubscription = true;
this._innerSub(result, value, index);
}

private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
}

protected _complete(): void {
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/mergeScan.ts
Expand Up @@ -100,7 +100,9 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: any, value: T, index: number): void {
this.add(subscribeToResult<T, R>(this, ish, value, index));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
}

protected _complete(): void {
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/onErrorResumeNext.ts
Expand Up @@ -152,7 +152,9 @@ class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (next) {
this.add(subscribeToResult(this, next));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, next, undefined, undefined, innerSubscriber);
} else {
this.destination.complete();
}
Expand Down
5 changes: 4 additions & 1 deletion src/internal/operators/skipUntil.ts
Expand Up @@ -44,7 +44,10 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {

constructor(destination: Subscriber<R>, notifier: ObservableInput<any>) {
super(destination);
this.add(this.innerSubscription = subscribeToResult(this, notifier));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
this.innerSubscription = innerSubscriber;
subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
}

protected _next(value: T) {
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/switchMap.ts
Expand Up @@ -113,7 +113,9 @@ class SwitchMapSubscriber<T, R> extends OuterSubscriber<T, R> {
if (innerSubscription) {
innerSubscription.unsubscribe();
}
this.add(this.innerSubscription = subscribeToResult(this, result, value, index));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber);
}

protected _complete(): void {
Expand Down