Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(share): propagate closed to firehose sources #6370

Merged
merged 5 commits into from May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 0 additions & 43 deletions spec/operators/multicast-spec.ts
Expand Up @@ -821,47 +821,4 @@ describe('multicast', () => {
);
});
});

// TODO: fix firehose unsubscription
// AFAICT, it's not possible for multicast observables to support ASAP
// unsubscription from synchronous firehose sources. The problem is that the
// chaining of the closed 'signal' is broken by the subject. For example,
// here:
//
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53
//
// The subject is passed to subscribe. However, in the subscribe
// implementation a SafeSubscriber is created with the subject as the
// observer:
//
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210
//
// That breaks the chaining of closed - i.e. even if the unsubscribe is
// called on the subject, closing it, the SafeSubscriber's closed property
// won't reflect that.
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects.push(i);
subscriber.next(i);
}
});

synchronousObservable
.pipe(
multicast(
() => new Subject<number>(),
(source) => source
),
take(3)
)
.subscribe(() => {
/* noop */
});

expect(sideEffects).to.deep.equal([0, 1, 2]);
});
});
21 changes: 0 additions & 21 deletions spec/operators/refCount-spec.ts
Expand Up @@ -114,25 +114,4 @@ describe('refCount', () => {
expect(arr[0]).to.equal('the number one');
expect(arr[1]).to.equal('the number two');
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects.push(i);
subscriber.next(i);
}
});

synchronousObservable.pipe(
multicast(() => new Subject<number>()),
refCount(),
take(3),
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([0, 1, 2]);
});
});
3 changes: 1 addition & 2 deletions spec/operators/share-spec.ts
Expand Up @@ -427,8 +427,7 @@ describe('share', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
Expand Down
19 changes: 0 additions & 19 deletions spec/operators/shareReplay-spec.ts
Expand Up @@ -347,25 +347,6 @@ describe('shareReplay', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects.push(i);
subscriber.next(i);
}
});

synchronousObservable.pipe(shareReplay(), take(3)).subscribe(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we'd still want this test for refCount: true.

/* noop */
});

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

const FinalizationRegistry = (global as any).FinalizationRegistry;
if (FinalizationRegistry) {
it('should not leak the subscriber for sync sources', (done) => {
Expand Down
36 changes: 20 additions & 16 deletions src/internal/operators/share.ts
Expand Up @@ -112,6 +112,26 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
// Create the subject if we don't have one yet.
subject = subject ?? connector();

// Add the teardown directly to the subscriber - instead of returning it -
// so that the handling of the subscriber's unsubscription will be wired
// up _before_ the subscription to the source occurs. This is done so that
// the assignment to the source connection's `closed` property will be seen
// by synchronous firehose sources.
subscriber.add(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🏆

refCount--;

// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
}
});

// The following line adds the subscription to the subscriber passed.
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
subject.subscribe(subscriber);
Expand Down Expand Up @@ -147,21 +167,5 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
});
from(source).subscribe(connection);
}

// This is also added to `subscriber`, technically.
return () => {
refCount--;

// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
}
};
});
}