Skip to content

Commit

Permalink
fix(share): handle reentrant subscribes (#6151)
Browse files Browse the repository at this point in the history
* test: add failing reentrancy test for share

* fix(share): handle reentrant subscribes

Closes #6144

* chore: remove redundant type argument

* refactor: use SafeSubscriber

* chore: fix comment typo
  • Loading branch information
cartant committed Mar 25, 2021
1 parent 011ad57 commit fc728cd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
23 changes: 21 additions & 2 deletions spec/operators/share-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { share, retry, mergeMapTo, mergeMap, tap, repeat, take, takeUntil, takeWhile, materialize } from 'rxjs/operators';
import { Observable, EMPTY, NEVER, of, Subject, Observer, from } from 'rxjs';
import { share, retry, mergeMapTo, mergeMap, tap, repeat, take, takeUntil, takeWhile, materialize, map, startWith, withLatestFrom } from 'rxjs/operators';
import { Observable, EMPTY, NEVER, of, Subject, defer } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
import sinon = require('sinon');
Expand Down Expand Up @@ -332,6 +332,25 @@ describe('share', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should not fail on reentrant subscription', () => {
// https://github.com/ReactiveX/rxjs/issues/6144
const source = cold('(123|)');
const subs = ['(^!) '];
const expected = '(136|)';

const deferred = defer(() => shared).pipe(
startWith(0)
);
const shared: Observable<string> = source.pipe(
withLatestFrom(deferred),
map(([a, b]) => String(Number(a) + Number(b))),
share()
);

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

describe('share(config)', () => {
Expand Down
17 changes: 11 additions & 6 deletions src/internal/operators/share.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Subject } from '../Subject';

import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types';
import { Subscription } from '../Subscription';
import { SafeSubscriber } from '../Subscriber';
import { from } from '../observable/from';
import { operate } from '../util/lift';

Expand Down Expand Up @@ -94,7 +93,7 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
options = options || {};
const { connector = () => new Subject<T>(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options;

let connection: Subscription | null = null;
let connection: SafeSubscriber<T> | null = null;
let subject: SubjectLike<T> | null = null;
let refCount = 0;
let hasCompleted = false;
Expand All @@ -118,9 +117,14 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
subject.subscribe(subscriber);

if (!connection) {
connection = from(source).subscribe({
next: (value) => subject!.next(value),
error: (err) => {
// We need to create a subscriber here - rather than pass an observer and
// assign the returned subscription to connection - because it's possible
// for reentrant subscriptions to the shared observable to occur and in
// those situations we want connection to be already-assigned so that we
// don't create another connection to the source.
connection = new SafeSubscriber({
next: (value: T) => subject!.next(value),
error: (err: any) => {
hasErrored = true;
// We need to capture the subject before
// we reset (if we need to reset).
Expand All @@ -141,6 +145,7 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
dest.complete();
},
});
from(source).subscribe(connection);
}

// This is also added to `subscriber`, technically.
Expand Down

0 comments on commit fc728cd

Please sign in to comment.