Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 6805: remove strange groupBy behavior #7252

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions spec/Subject-spec.ts
Expand Up @@ -4,7 +4,7 @@ import { AnonymousSubject } from 'rxjs/internal/Subject';
import { delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';
import { OperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber';
import { createOperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber';

/** @test {Subject} */
describe('Subject', () => {
Expand Down Expand Up @@ -733,7 +733,7 @@ describe('Subject', () => {
const subject = new Subject<number>();
const destination = new Subscriber();
const results: any[] = [];
const subscriber = new OperatorSubscriber(destination, (value) => {
const subscriber = createOperatorSubscriber(destination, (value) => {
results.push(value);
}, () => {
results.push('complete');
Expand Down
20 changes: 10 additions & 10 deletions spec/operators/groupBy-spec.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { groupBy, delay, tap, map, take, mergeMap, materialize, skip, ignoreElements } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { ReplaySubject, of, Observable, Operator, Observer, Subject, NextNotification, ErrorNotification } from 'rxjs';
import { ReplaySubject, of, Observable, Subject, NextNotification, ErrorNotification } from 'rxjs';
import { createNotification } from 'rxjs/internal/NotificationFactories';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -534,8 +534,8 @@ describe('groupBy operator', () => {
});
});

it('should allow the outer to be unsubscribed early but inners continue', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
it('should unsubscribe inner subscriptions when the result unsubscribes', () => {
testScheduler.run(({ cold, hot, expectObservable }) => {
const values = {
a: ' foo',
b: ' FoO ',
Expand All @@ -551,10 +551,10 @@ describe('groupBy operator', () => {
l: ' fOo ',
};
const e1 = hot('-1--2--^-a-b-c-d-e-f-g-h-i-j-k-l-|', values);
const unsub = ' ---------! ';
const expected = ' --w---x--- ';
const w = cold(' a-b---d---------i-----l-| ', values);
const x = cold(' c-------g-h---------| ', values);
const unsub = ' ----------! ';
const expected = ' --w---x---- ';
const w = cold(' a-b---d-- ', values);
const x = cold(' c---- ', values);
const expectedValues = { w: w, x: x };

const source = e1.pipe(groupBy((val: string) => val.toLowerCase().trim()));
Expand Down Expand Up @@ -884,7 +884,7 @@ describe('groupBy operator', () => {
});
});

it('should allow using a durationSelector, and outer unsubscribed early', () => {
it('should allow using a durationSelector, and unsub from outer and inner at the same time', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const values = {
a: ' foo',
Expand All @@ -904,8 +904,8 @@ describe('groupBy operator', () => {
const unsub = ' -----------! ';
const expected = ' --v---w---x- ';
const v = cold(' a-b---(d|) ', values);
const w = cold(' c-------g-(h|) ', values);
const x = cold(' e---------j-(k|) ', values);
const w = cold(' c----- ', values);
const x = cold(' e- ', values);
const expectedValues = { v: v, w: w, x: x };

const source = e1.pipe(
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/onErrorResumeNext.ts
@@ -1,7 +1,7 @@
import { Observable } from '../Observable';
import { ObservableInputTuple } from '../types';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { noop } from '../util/noop';
import { from } from './from';

Expand Down Expand Up @@ -89,7 +89,7 @@ export function onErrorResumeNext<A extends readonly unknown[]>(
subscribeNext();
return;
}
const innerSubscriber = new OperatorSubscriber(subscriber, undefined, noop, noop);
const innerSubscriber = createOperatorSubscriber(subscriber, undefined, noop, noop);
nextSource.subscribe(innerSubscriber);
innerSubscriber.add(subscribeNext);
} else {
Expand Down
19 changes: 6 additions & 13 deletions src/internal/operators/OperatorSubscriber.ts
Expand Up @@ -26,7 +26,7 @@ export function createOperatorSubscriber<T>(
* A generic helper for allowing operators to be created with a Subscriber and
* use closures to capture necessary state from the operator function itself.
*/
export class OperatorSubscriber<T> extends Subscriber<T> {
class OperatorSubscriber<T> extends Subscriber<T> {
/**
* Creates an instance of an `OperatorSubscriber`.
* @param destination The downstream subscriber.
Expand All @@ -38,18 +38,13 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
* this handler are sent to the `destination` error handler.
* @param onFinalize Additional finalization logic here. This will only be called on finalization if the
* subscriber itself is not already closed. This is called after all other finalization logic is executed.
* @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe.
* NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription
* to the resulting observable does not actually disconnect from the source if there are active subscriptions
* to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!)
*/
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onComplete?: () => void,
onError?: (err: any) => void,
private onFinalize?: () => void,
private shouldUnsubscribe?: () => boolean
private onFinalize?: () => void
) {
// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -102,11 +97,9 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
}

unsubscribe() {
if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
}
31 changes: 4 additions & 27 deletions src/internal/operators/groupBy.ts
Expand Up @@ -2,7 +2,7 @@ import { Observable } from '../Observable';
import { from } from '../observable/from';
import { Subject } from '../Subject';
import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

export interface BasicGroupByOptions<K, T> {
element?: undefined;
Expand Down Expand Up @@ -165,20 +165,14 @@ export function groupBy<T, K, R>(
// next call from the source.
const handleError = (err: any) => notify((consumer) => consumer.error(err));

// The number of actively subscribed groups
let activeGroups = 0;

// Whether or not teardown was attempted on this subscription.
let teardownAttempted = false;

// Capturing a reference to this, because we need a handle to it
// in `createGroupedObservable` below. This is what we use to
// subscribe to our source observable. This sometimes needs to be unsubscribed
// out-of-band with our `subscriber` which is the downstream subscriber, or destination,
// in cases where a user unsubscribes from the main resulting subscription, but
// still has groups from this subscription subscribed and would expect values from it
// Consider: `source.pipe(groupBy(fn), take(2))`.
Comment on lines 168 to 174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisit this comment.

const groupBySourceSubscriber = new OperatorSubscriber(
const groupBySourceSubscriber = createOperatorSubscriber(
subscriber,
(value: T) => {
// Because we have to notify all groups of any errors that occur in here,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GH didn't allow me to put a comment in the exact spot lines 189-191 here but I think the comment also needs revisiting as the mentioned reference counting seem to be removed now.

Expand Down Expand Up @@ -240,14 +234,7 @@ export function groupBy<T, K, R>(
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
// don't have to.
() => groups.clear(),
() => {
teardownAttempted = true;
// We only kill our subscription to the source if we have
// no active groups. As stated above, consider this scenario:
// source$.pipe(groupBy(fn), take(2)).
return activeGroups === 0;
}
() => groups.clear()
);

// Subscribe to the source
Expand All @@ -259,17 +246,7 @@ export function groupBy<T, K, R>(
* @param groupSubject The subject that fuels the group
*/
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
const result: any = new Observable<T>((groupSubscriber) => {
activeGroups++;
const innerSub = groupSubject.subscribe(groupSubscriber);
return () => {
innerSub.unsubscribe();
// We can kill the subscription to our source if we now have no more
// active groups subscribed, and a finalization was already attempted on
// the source.
--activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
};
});
const result: any = new Observable<T>((groupSubscriber) => groupSubject.subscribe(groupSubscriber));
result.key = key;
return result;
}
Expand Down