Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: chain subscriptions from interop observables for 6.x #5178

Merged
merged 2 commits into from Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions spec/helpers/interop-helper-spec.ts
@@ -0,0 +1,19 @@
import { expect } from 'chai';
import { Observable, of, Subscriber } from 'rxjs';
import { observable as symbolObservable } from 'rxjs/internal/symbol/observable';
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';
import { asInteropObservable, asInteropSubscriber } from './interop-helper';

describe('interop helper', () => {
it('should simulate interop observables', () => {
const observable = asInteropObservable(of(42));
expect(observable).to.not.be.instanceOf(Observable);
expect(observable[symbolObservable]).to.be.a('function');
});

it('should simulate interop subscribers', () => {
const subscriber = asInteropSubscriber(new Subscriber());
expect(subscriber).to.not.be.instanceOf(Subscriber);
expect(subscriber[symbolSubscriber]).to.be.undefined;
});
});
57 changes: 57 additions & 0 deletions spec/helpers/interop-helper.ts
@@ -0,0 +1,57 @@
import { Observable, Subscriber, Subscription } from 'rxjs';
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';

/**
* Returns an observable that will be deemed by this package's implementation
* to be an observable that requires interop. The returned observable will fail
* the `instanceof Observable` test and will deem any `Subscriber` passed to
* its `subscribe` method to be untrusted.
*/
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'subscribe') {
const { subscribe } = target;
return interopSubscribe(subscribe);
}
return Reflect.get(target, key);
},
getPrototypeOf(target: Observable<T>) {
const { subscribe, ...rest } = Object.getPrototypeOf(target);
return {
...rest,
subscribe: interopSubscribe(subscribe)
};
}
});
}

/**
* Returns a subscriber that will be deemed by this package's implementation to
* be untrusted. The returned subscriber will fail the `instanceof Subscriber`
* test and will not include the symbol that identifies trusted subscribers.
*/
export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T> {
return new Proxy(subscriber, {
get(target: Subscriber<T>, key: string | number | symbol) {
if (key === symbolSubscriber) {
return undefined;
}
return Reflect.get(target, key);
},
getPrototypeOf(target: Subscriber<T>) {
const { [symbolSubscriber]: symbol, ...rest } = Object.getPrototypeOf(target);
return rest;
}
});
}

function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
return function (this: Observable<T>, ...args: any[]): Subscription {
const [arg] = args;
if (arg instanceof Subscriber) {
return subscribe.call(this, asInteropSubscriber(arg));
}
return subscribe.apply(this, args);
};
}
22 changes: 22 additions & 0 deletions spec/operators/catch-spec.ts
Expand Up @@ -5,6 +5,7 @@ import { TestScheduler } from 'rxjs/testing';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -121,6 +122,27 @@ describe('catchError operator', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from a caught cold caught interop observable when unsubscribed explicitly', () => {
const e1 = hot('-1-2-3-# ');
const e1subs = '^ ! ';
const e2 = cold( '5-6-7-8-9-|');
const e2subs = ' ^ ! ';
const expected = '-1-2-3-5-6-7- ';
const unsub = ' ! ';

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(catchError(() => asInteropObservable(e2)));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
Expand Down
34 changes: 34 additions & 0 deletions spec/operators/exhaustMap-spec.ts
Expand Up @@ -2,6 +2,7 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/mar
import { concat, defer, Observable, of, from } from 'rxjs';
import { exhaustMap, mergeMap, takeWhile, map } from 'rxjs/operators';
import { expect } from 'chai';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -202,6 +203,39 @@ describe('exhaustMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const x = cold( '--a--b--c--| ');
const xsubs = ' ^ ! ';
const y = cold( '--d--e--f--| ');
const ysubs: string[] = [];
const z = cold( '--g--h--i--| ');
const zsubs = ' ^ ! ';
const e1 = hot('---x---------y-----------------z-------------|');
const e1subs = '^ ! ';
const expected = '-----a--b--c---------------------g- ';
const unsub = ' ! ';

const observableLookup = { x: x, y: y, z: z };

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
exhaustMap(value => asInteropObservable(observableLookup[value])),
mergeMap(x => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(z.subscriptions).toBe(zsubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
Expand Down
31 changes: 31 additions & 0 deletions spec/operators/mergeMap-spec.ts
Expand Up @@ -2,6 +2,7 @@ import { expect } from 'chai';
import { mergeMap, map } from 'rxjs/operators';
import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { asInteropObservable } from '../helpers/interop-helper';

declare const type: Function;
declare const asDiagram: Function;
Expand Down Expand Up @@ -260,6 +261,36 @@ describe('mergeMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const x = cold( '--a--b--c--d--e--| ');
const xsubs = ' ^ ! ';
const y = cold( '---f---g---h---i--|');
const ysubs = ' ^ ! ';
const e1 = hot('---------x---------y---------| ');
const e1subs = '^ ! ';
const expected = '-----------a--b--c--d- ';
const unsub = ' ! ';

const observableLookup = { x: x, y: y };

// This test manipulates the observable to make it look like an interop
// observable - an observable from a foreign library. Interop subscribers
// are treated differently: they are wrapped in a safe subscriber. This
// test ensures that unsubscriptions are chained all the way to the
// interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
mergeMap(value => asInteropObservable(observableLookup[value])),
mergeMap(x => of(x)),
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMap many outer to many inner, inner never completes', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c-------d-------| ');
Expand Down
18 changes: 18 additions & 0 deletions spec/operators/onErrorResumeNext-spec.ts
Expand Up @@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { onErrorResumeNext, takeWhile } from 'rxjs/operators';
import { concat, defer, throwError, of } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -129,6 +130,23 @@ describe('onErrorResumeNext operator', () => {
expect(sideEffects).to.deep.equal([1, 2]);
});

it('should unsubscribe from an interop observble upon explicit unsubscription', () => {
const source = hot('--a--b--#');
const next = cold( '--c--d--');
const nextSubs = ' ^ !';
const subs = '^ !';
const expected = '--a--b----c--';

// This test manipulates the observable to make it look like an interop
// observable - an observable from a foreign library. Interop subscribers
// are treated differently: they are wrapped in a safe subscriber. This
// test ensures that unsubscriptions are chained all the way to the
// interop subscriber.

expectObservable(source.pipe(onErrorResumeNext(asInteropObservable(next))), subs).toBe(expected);
expectSubscriptions(next.subscriptions).toBe(nextSubs);
});

it('should work with promise', (done: MochaDone) => {
const expected = [1, 2];
const source = concat(of(1), throwError('meh'));
Expand Down
27 changes: 26 additions & 1 deletion spec/operators/skipUntil-spec.ts
Expand Up @@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { concat, defer, Observable, of, Subject } from 'rxjs';
import { skipUntil, mergeMap } from 'rxjs/operators';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -97,6 +98,31 @@ describe('skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const e1 = hot('--a--b--c--d--e----|');
const e1subs = '^ ! ';
const skip = hot('-------------x--| ');
const skipSubs = '^ ! ';
const expected = ('---------- ');
const unsub = ' ! ';

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
skipUntil(asInteropObservable(skip)),
mergeMap(x => of(x)),
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all elements when notifier is empty', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ !';
Expand Down Expand Up @@ -248,7 +274,6 @@ describe('skipUntil', () => {
});

it('should stop listening to a synchronous notifier after its first nexted value', () => {
// const source = hot('-^-o---o---o---o---o---o---|');
const sideEffects: number[] = [];
const synchronousNotifer = concat(
defer(() => {
Expand Down
31 changes: 31 additions & 0 deletions spec/operators/switchMap-spec.ts
Expand Up @@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators';
import { concat, defer, of, Observable } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -169,6 +170,36 @@ describe('switchMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
const x = cold( '--a--b--c--d--e--| ');
const xsubs = ' ^ ! ';
const y = cold( '---f---g---h---i--|');
const ysubs = ' ^ ! ';
const e1 = hot('---------x---------y---------| ');
const e1subs = '^ ! ';
const expected = '-----------a--b--c---- ';
const unsub = ' ! ';

const observableLookup = { x: x, y: y };

// This test is the same as the previous test, but the observable is
// manipulated to make it look like an interop observable - an observable
// from a foreign library. Interop subscribers are treated differently:
// they are wrapped in a safe subscriber. This test ensures that
// unsubscriptions are chained all the way to the interop subscriber.

const result = e1.pipe(
mergeMap(x => of(x)),
switchMap(value => asInteropObservable(observableLookup[value])),
mergeMap(x => of(x)),
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
Expand Down
5 changes: 4 additions & 1 deletion spec/tsconfig.json
@@ -1,3 +1,6 @@
{
"extends": "../tsconfig.json"
"extends": "../tsconfig.json",
"compilerOptions": {
"lib": ["esnext", "dom"]
}
}
2 changes: 1 addition & 1 deletion spec/util/toSubscriber-spec.ts
Expand Up @@ -12,7 +12,7 @@ describe('toSubscriber', () => {
expect(sub2.closed).to.be.true;
});

it('should not be closed when other subscriber created with same observer instance completes', () => {
it('should not be closed when other subscriber created with same observer instance completes', () => {
let observer = {
next: function () { /*noop*/ }
};
Expand Down
8 changes: 7 additions & 1 deletion src/internal/operators/catchError.ts
Expand Up @@ -137,7 +137,13 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
this._unsubscribeAndRecycle();
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
}
}
}
}
10 changes: 8 additions & 2 deletions src/internal/operators/exhaustMap.ts
Expand Up @@ -122,10 +122,16 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
Expand Down