Skip to content

Commit

Permalink
fix(Subscriber): Can no longer subscribe to itself in a circular mann…
Browse files Browse the repository at this point in the history
…er (#4106)

* test(mergeMap): add failing test

* fix(subscriber): don't unsubscribe self

When unsubscribing a subscriber's parent, make sure that the subscriber
itself is not unsubscribed.

Closes #4095

* refactor(mergeMap): simplify

* chore(typings): use union type for destination

* chore(test): remove only

* chore(test): use pipe

* test(Subscriber): fake add too

* test(internals): update for subscription changes

* refactor(zip): add to destination

* refactor(delay): add to destination

* refactor(delayWhen): add to destination

* refactor(exhaustMap): add to destination

* refactor(expand): add to destination

* refactor(mergeScan): add to destination

* refactor(observeOn): add to destination

* refactor(onErrorResumeNext): add to destination

* refactor(sequenceEqual): add to destination

* refactor(switchMap): add to destination

* chore(Subscriber): remove _addParentTeardownLogic

* chore(test): simplify mergeMap test

Remove the mapTo and the concat to reduce the number of subscribers to
make the test easier to reason with.
  • Loading branch information
cartant authored and benlesh committed Sep 25, 2018
1 parent 02780dd commit e623ec6
Show file tree
Hide file tree
Showing 18 changed files with 118 additions and 61 deletions.
3 changes: 2 additions & 1 deletion spec/Subscriber-spec.ts
Expand Up @@ -23,7 +23,8 @@ describe('Subscriber', () => {
it('should accept subscribers as a destination if they meet the proper criteria', () => {
const fakeSubscriber = {
[rxSubscriber](this: any) { return this; },
_addParentTeardownLogic() { /* noop */ }
add() { /* noop */ },
syncErrorThrowable: false
};

const subscriber = new Subscriber(fakeSubscriber as any);
Expand Down
30 changes: 27 additions & 3 deletions spec/operators/mergeMap-spec.ts
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { mergeMap, map } from 'rxjs/operators';
import { asapScheduler, defer, Observable, from, of } from 'rxjs';
import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';

declare const type: Function;
Expand Down Expand Up @@ -717,7 +717,7 @@ describe('mergeMap', () => {
// Added as a failing test when investigating:
// https://github.com/ReactiveX/rxjs/issues/4071

const results: any[] = [];
const results: (number | string)[] = [];

of(1).pipe(
mergeMap(() => defer(() =>
Expand All @@ -744,7 +744,7 @@ describe('mergeMap', () => {
// Added as a failing test when investigating:
// https://github.com/ReactiveX/rxjs/issues/4071

const results: any[] = [];
const results: (number | string)[] = [];

of(1).pipe(
mergeMap(() =>
Expand All @@ -764,6 +764,30 @@ describe('mergeMap', () => {
}, 0);
});

it('should support wrapped sources', (done: MochaDone) => {

// Added as a failing test when investigating:
// https://github.com/ReactiveX/rxjs/issues/4095

const results: (number | string)[] = [];

const wrapped = new Observable<number>(subscriber => {
const subscription = timer(0, asapScheduler).subscribe(subscriber);
return () => subscription.unsubscribe();
});
wrapped.pipe(
mergeMap(() => timer(0, asapScheduler))
).subscribe({
next(value) { results.push(value); },
complete() { results.push('done'); }
});

setTimeout(() => {
expect(results).to.deep.equal([0, 'done']);
done();
}, 0);
});

type('should support type signatures', () => {
let o: Observable<number>;

Expand Down
20 changes: 9 additions & 11 deletions spec/operators/observeOn-spec.ts
Expand Up @@ -103,23 +103,21 @@ describe('observeOn operator', () => {
.pipe(observeOn(asapScheduler))
.subscribe(
x => {
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // one for the consumer, and one for the notification
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('N');
expect(observeOnSubscriber._subscriptions[1].state.notification.value)
.to.equal(x);
// see #4106 - inner subscriptions are now added to destinations
// so the subscription will contain an ObserveOnSubscriber and a subscription for the scheduled action
expect(subscription._subscriptions.length).to.equal(2);
const actionSubscription = subscription._subscriptions[1];
expect(actionSubscription.state.notification.kind).to.equal('N');
expect(actionSubscription.state.notification.value).to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
// the consumer will have been unsubscribed via Subscriber#_parentSubscription
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(1); // one for the complete notification
// only this completion notification should remain.
expect(observeOnSubscriber._subscriptions[0].state.notification.kind)
.to.equal('C');
expect(subscription._subscriptions.length).to.equal(1);
const actionSubscription = subscription._subscriptions[0];
expect(actionSubscription.state.notification.kind).to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
done();
Expand Down
21 changes: 13 additions & 8 deletions spec/operators/switch-spec.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Observable, of, NEVER, queueScheduler, Subject } from 'rxjs';
import { switchAll } from 'rxjs/operators';
import { map, switchAll } from 'rxjs/operators';

declare function asDiagram(arg: string): Function;
declare const type: Function;
Expand Down Expand Up @@ -222,9 +222,9 @@ describe('switchAll', () => {
it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => {
let iStream: Subject<number>;
const oStreamControl = new Subject<number>();
const oStream = oStreamControl.map(() => {
return (iStream = new Subject<number>());
});
const oStream = oStreamControl.pipe(
map(() => (iStream = new Subject<number>()))
);
const switcher = oStream.pipe(switchAll());
const result: number[] = [];
let sub = switcher.subscribe((x) => result.push(x));
Expand All @@ -242,19 +242,24 @@ describe('switchAll', () => {

it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => {
const oStreamControl = new Subject<number>();
const oStream = oStreamControl.map(() => {
return (new Subject<number>());
});
const oStream = oStreamControl.pipe(
map(() => new Subject<number>())
);
const switcher = oStream.pipe(switchAll());
const result: number[] = [];
let sub = switcher.subscribe((x) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
});
// Expect two children of switch(): The oStream and the first inner
// Expect one child of switch(): The oStream
expect(
(sub as any)._subscriptions[0]._subscriptions.length
).to.equal(1);
// Expect two children of subscribe(): The destination and the first inner
// See #4106 - inner subscriptions are now added to destinations
expect(
(sub as any)._subscriptions.length
).to.equal(2);
sub.unsubscribe();
});
Expand Down
2 changes: 1 addition & 1 deletion src/internal/Observable.ts
Expand Up @@ -199,7 +199,7 @@ export class Observable<T> implements Subscribable<T> {
if (operator) {
operator.call(sink, this.source);
} else {
sink._addParentTeardownLogic(
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
Expand Down
22 changes: 3 additions & 19 deletions src/internal/Subscriber.ts
Expand Up @@ -45,7 +45,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
/** @internal */ syncErrorThrowable: boolean = false;

protected isStopped: boolean = false;
protected destination: PartialObserver<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
protected destination: PartialObserver<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)

private _parentSubscription: Subscription | null = null;

Expand Down Expand Up @@ -78,7 +78,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber<any>;
this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable;
this.destination = trustedSubscriber;
trustedSubscriber._addParentTeardownLogic(this);
trustedSubscriber.add(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
Expand Down Expand Up @@ -116,7 +116,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
this._unsubscribeParentSubscription();
}
}

Expand All @@ -130,7 +129,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
this._unsubscribeParentSubscription();
}
}

Expand All @@ -156,20 +154,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.unsubscribe();
}

/** @deprecated This is an internal implementation detail, do not use. */
_addParentTeardownLogic(parentTeardownLogic: TeardownLogic) {
if (parentTeardownLogic !== this) {
this._parentSubscription = this.add(parentTeardownLogic);
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribeParentSubscription() {
if (this._parentSubscription !== null) {
this._parentSubscription.unsubscribe();
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
Expand Down Expand Up @@ -326,5 +310,5 @@ export class SafeSubscriber<T> extends Subscriber<T> {
}

export function isTrustedSubscriber(obj: any) {
return obj instanceof Subscriber || ('_addParentTeardownLogic' in obj && obj[rxSubscriberSymbol]);
return obj instanceof Subscriber || ('syncErrorThrowable' in obj && obj[rxSubscriberSymbol]);
}
1 change: 0 additions & 1 deletion src/internal/Subscription.ts
Expand Up @@ -45,7 +45,6 @@ export class Subscription implements SubscriptionLike {
constructor(unsubscribe?: () => void) {
if (unsubscribe) {
(<any> this)._unsubscribe = unsubscribe;

}
}

Expand Down
6 changes: 5 additions & 1 deletion src/internal/observable/zip.ts
Expand Up @@ -4,6 +4,7 @@ import { isArray } from '../util/isArray';
import { Operator } from '../Operator';
import { ObservableInput, PartialObserver } from '../types';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
Expand Down Expand Up @@ -126,6 +127,8 @@ export class ZipSubscriber<T, R> extends Subscriber<T> {
const iterators = this.iterators;
const len = iterators.length;

this.unsubscribe();

if (len === 0) {
this.destination.complete();
return;
Expand All @@ -135,7 +138,8 @@ export class ZipSubscriber<T, R> extends Subscriber<T> {
for (let i = 0; i < len; i++) {
let iterator: ZipBufferIterator<any, any> = <any>iterators[i];
if (iterator.stillUnsubscribed) {
this.add(iterator.subscribe(iterator, i));
const destination = this.destination as Subscription;
destination.add(iterator.subscribe(iterator, i));
} else {
this.active--; // not an observable
}
Expand Down
6 changes: 5 additions & 1 deletion src/internal/operators/delay.ts
Expand Up @@ -2,6 +2,7 @@ import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Notification } from '../Notification';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, PartialObserver, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
Expand Down Expand Up @@ -110,7 +111,8 @@ class DelaySubscriber<T> extends Subscriber<T> {

private _schedule(scheduler: SchedulerLike): void {
this.active = true;
this.add(scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
const destination = this.destination as Subscription;
destination.add(scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}));
}
Expand All @@ -137,10 +139,12 @@ class DelaySubscriber<T> extends Subscriber<T> {
this.errored = true;
this.queue = [];
this.destination.error(err);
this.unsubscribe();
}

protected _complete() {
this.scheduleNotification(Notification.createComplete());
this.unsubscribe();
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/internal/operators/delayWhen.ts
Expand Up @@ -132,6 +132,7 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
protected _complete(): void {
this.completed = true;
this.tryComplete();
this.unsubscribe();
}

private removeSubscription(subscription: InnerSubscriber<T, R>): T {
Expand All @@ -149,7 +150,8 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
const notifierSubscription = subscribeToResult(this, delayNotifier, value);

if (notifierSubscription && !notifierSubscription.closed) {
this.add(notifierSubscription);
const destination = this.destination as Subscription;
destination.add(notifierSubscription);
this.delayNotifierSubscriptions.push(notifierSubscription);
}
}
Expand Down Expand Up @@ -199,6 +201,7 @@ class SubscriptionDelaySubscriber<T> extends Subscriber<T> {
}

protected _complete() {
this.unsubscribe();
this.subscribeToSource();
}

Expand Down
7 changes: 5 additions & 2 deletions src/internal/operators/exhaustMap.ts
Expand Up @@ -120,7 +120,8 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {

private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
}

Expand All @@ -129,6 +130,7 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
if (!this.hasSubscription) {
this.destination.complete();
}
this.unsubscribe();
}

notifyNext(outerValue: T, innerValue: R,
Expand All @@ -142,7 +144,8 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

notifyComplete(innerSub: Subscription): void {
this.remove(innerSub);
const destination = this.destination as Subscription;
destination.remove(innerSub);

this.hasSubscription = false;
if (this.hasCompleted) {
Expand Down
10 changes: 7 additions & 3 deletions src/internal/operators/expand.ts
Expand Up @@ -133,7 +133,8 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
this.subscribeToProjection(result, value, index);
} else {
const state: DispatchArg<T, R> = { subscriber: this, result, value, index };
this.add(this.scheduler.schedule<DispatchArg<T, R>>(ExpandSubscriber.dispatch, 0, state));
const destination = this.destination as Subscription;
destination.add(this.scheduler.schedule<DispatchArg<T, R>>(ExpandSubscriber.dispatch, 0, state));
}
} else {
this.buffer.push(value);
Expand All @@ -142,14 +143,16 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {

private subscribeToProjection(result: any, value: T, index: number): void {
this.active++;
this.add(subscribeToResult<T, R>(this, result, value, index));
const destination = this.destination as Subscription;
destination.add(subscribeToResult<T, R>(this, result, value, index));
}

protected _complete(): void {
this.hasCompleted = true;
if (this.hasCompleted && this.active === 0) {
this.destination.complete();
}
this.unsubscribe();
}

notifyNext(outerValue: T, innerValue: R,
Expand All @@ -160,7 +163,8 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {

notifyComplete(innerSub: Subscription): void {
const buffer = this.buffer;
this.remove(innerSub);
const destination = this.destination as Subscription;
destination.remove(innerSub);
this.active--;
if (buffer && buffer.length > 0) {
this._next(buffer.shift());
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/mergeMap.ts
Expand Up @@ -140,7 +140,8 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
}

Expand All @@ -149,6 +150,7 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
if (this.active === 0 && this.buffer.length === 0) {
this.destination.complete();
}
this.unsubscribe();
}

notifyNext(outerValue: T, innerValue: R,
Expand Down

0 comments on commit e623ec6

Please sign in to comment.