Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concat/merge operators will finalize inners before moving to the next #6010

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 36 additions & 2 deletions spec/operators/concatAll-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { from, throwError, of, Observable } from 'rxjs';
import { concatAll, take, mergeMap } from 'rxjs/operators';
import { from, throwError, of, Observable, defer } from 'rxjs';
import { concatAll, take, mergeMap, finalize, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -58,6 +58,40 @@ describe('concatAll operator', () => {
);
});

it('should finalize before moving to the next observable', () => {
const results: any[] = [];

const create = (n: number) => defer(() => {
results.push(`init ${n}`);
return of(`next ${n}`).pipe(
delay(100, testScheduler),
finalize(() => {
results.push(`finalized ${n}`)
})
);
});

of(create(1), create(2), create(3)).pipe(
concatAll()
).subscribe({
next: value => results.push(value),
});

testScheduler.flush();

expect(results).to.deep.equal([
'init 1',
'next 1',
'finalized 1',
'init 2',
'next 2',
'finalized 2',
'init 3',
'next 3',
'finalized 3'
]);
});

it('should concat and raise error from promise', function(done) {
this.timeout(2000);

Expand Down
38 changes: 36 additions & 2 deletions spec/operators/concatMap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { of, from, Observable } from 'rxjs';
import { concatMap, mergeMap, map, take } from 'rxjs/operators';
import { of, from, Observable, defer } from 'rxjs';
import { concatMap, mergeMap, map, take, finalize, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -677,6 +677,40 @@ describe('Observable.prototype.concatMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should finalize before moving to the next observable', () => {
const results: any[] = [];

const create = (n: number) => defer(() => {
results.push(`init ${n}`);
return of(`next ${n}`).pipe(
delay(100, testScheduler),
finalize(() => {
results.push(`finalized ${n}`)
})
);
});

of(1, 2, 3).pipe(
concatMap(n => create(n))
).subscribe({
next: value => results.push(value),
});

testScheduler.flush();

expect(results).to.deep.equal([
'init 1',
'next 1',
'finalized 1',
'init 2',
'next 2',
'finalized 2',
'init 3',
'next 3',
'finalized 3'
]);
});

function arrayRepeat(value: string, times: number) {
let results = [];
Expand Down
11 changes: 6 additions & 5 deletions src/internal/operators/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
* and send to the `destination` error handler.
* @param onComplete Handles completion notification from the subscription. Any errors that occur in
* this handler are sent to the `destination` error handler.
* @param onUnsubscribe Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. Called before any additional teardown logic is called.
* @param onFinalize Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. This is called after all other teardown logic is executed.
*/
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onError?: (err: any) => void,
onComplete?: () => void,
private onUnsubscribe?: () => void
private onFinalize?: () => void
) {
// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -73,8 +73,9 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
}

unsubscribe() {
// Execute additional teardown if we have any and we didn't already do so.
!this.closed && this.onUnsubscribe?.();
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
}
59 changes: 40 additions & 19 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function mergeInternals<T, R>(
additionalTeardown?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
let buffer: T[] = [];
const buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// An index to pass to our accumulator function
Expand Down Expand Up @@ -61,6 +61,11 @@ export function mergeInternals<T, R>(
// against our concurrency limit later.
active++;

// A flag used to show that the inner observable completed.
// This is checked during finalization to see if we should
// move to the next item in the buffer, if there is on.
let innerComplete = false;

// Start our inner subscription.
innerFrom(project(value, index++)).subscribe(
new OperatorSubscriber(
Expand All @@ -82,23 +87,41 @@ export function mergeInternals<T, R>(
// Errors are passed to the destination.
undefined,
() => {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue);
// Flag that we have completed, so we know to check the buffer
// during finalization.
innerComplete = true;
},
() => {
// During finalization, if the inner completed (it wasn't errored or
// cancelled), then we want to try the next item in the buffer if
// there is one.
if (innerComplete) {
// We have to wrap this in a try/catch because it happens during
// finalization, possibly asynchronously, and we want to pass
// any errors that happen (like in a projection function) to
// the outer Subscriber.
try {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue);
}
// Check to see if we can complete, and complete if so.
checkComplete();
} catch (err) {
subscriber.error(err);
}
}
// Check to see if we can complete, and complete if so.
checkComplete();
}
)
);
Expand All @@ -122,8 +145,6 @@ export function mergeInternals<T, R>(
// Additional teardown (for when the destination is torn down).
// Other teardown is added implicitly via subscription above.
return () => {
// Ensure buffered values are released.
buffer = null!;
additionalTeardown?.();
};
}