Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(forkJoin): the first empty source will now cause an EmptyError #7266

Merged
merged 1 commit into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 41 additions & 29 deletions spec/observables/forkJoin-spec.ts
@@ -1,6 +1,6 @@
/** @prettier */
import { expect } from 'chai';
import { finalize, forkJoin, map, of, timer } from 'rxjs';
import { EmptyError, finalize, forkJoin, map, of, timer } from 'rxjs';
import { lowerCaseO } from '../helpers/test-helper';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
Expand Down Expand Up @@ -123,9 +123,9 @@ describe('forkJoin', () => {
const s2 = hot(' (b|)');
const s3 = lowerCaseO();
const e1 = forkJoin([s1, s2, s3]);
const expected = '|';
const expected = '#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

Expand Down Expand Up @@ -158,32 +158,32 @@ describe('forkJoin', () => {
const s2 = hot(' (b|)');
const s3 = hot(' ------------------|');
const e1 = forkJoin([s1, s2, s3]);
const expected = '------------------|';
const expected = '------------------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

it('should complete early if any of source is empty and completes before than others', () => {
it('should error with EmptyError if any of source is empty and completes before than others', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const s1 = hot(' --a--b--c--d--|');
const s2 = hot(' (b|)');
const s3 = hot(' ---------|');
const e1 = forkJoin([s1, s2, s3]);
const expected = '---------|';
const expected = '---------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

it('should complete when all sources are empty', () => {
it('should error on the first empty source', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const s1 = hot(' --------------|');
const s2 = hot(' ---------|');
const e1 = forkJoin([s1, s2]);
const expected = '---------|';
const expected = '---------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

Expand Down Expand Up @@ -212,9 +212,9 @@ describe('forkJoin', () => {
const s1 = hot(' --------------');
const s2 = hot(' ------|');
const e1 = forkJoin([s1, s2]);
const expected = '------|';
const expected = '------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

Expand Down Expand Up @@ -371,16 +371,16 @@ describe('forkJoin', () => {
});
});

it('should accept empty lowercase-o observables', () => {
it('should error for empty lowercase-o observables', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const e1 = forkJoin({
foo: hot(' --a--b--c--d--|'),
bar: hot(' (b|)'),
baz: lowerCaseO(),
});
const expected = '|';
const expected = '#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

Expand Down Expand Up @@ -409,42 +409,41 @@ describe('forkJoin', () => {
});
});

it('should not emit if any of source observable is empty', () => {
it('should error with EmptyError if any of source observable is empty', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const e1 = forkJoin({
foo: hot(' --a--b--c--d--|'),
bar: hot(' (b|)'),
baz: hot(' ------------------|'),
});
const expected = '------------------|';
const expected = '------------------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

// TODO: This seems odd. Filed an issue for discussion here: https://github.com/ReactiveX/rxjs/issues/5561
it('should complete early if any of source is empty and completes before than others', () => {
it('should error with EmptyError if any of source is empty and completes before than others', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const e1 = forkJoin({
foo: hot(' --a--b--c--d--|'),
bar: hot(' (b|)'),
baz: hot(' ---------|'),
});
const expected = '---------|';
const expected = '---------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

it('should complete when all sources are empty', () => {
it('should error when the first source returns that is empty, even if all sources are empty', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const e1 = forkJoin({
foo: hot(' --------------|'),
bar: hot(' ---------|'),
});
const expected = '---------|';
const expected = '---------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

Expand All @@ -471,15 +470,15 @@ describe('forkJoin', () => {
});
});

it('should complete when one of the sources never completes but other completes without values', () => {
it('should error when one of the sources never completes but other completes without values', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const e1 = forkJoin({
foo: hot(' --------------'),
bar: hot(' ------|'),
});
const expected = '------|';
const expected = '------#';

expectObservable(e1).toBe(expected);
expectObservable(e1).toBe(expected, undefined, new EmptyError());
});
});

Expand Down Expand Up @@ -595,5 +594,18 @@ describe('forkJoin', () => {
},
});
});

it('should error on early completion of an inner observable', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const e1 = forkJoin({
foo: hot(' --a--b--c--d--|'),
bar: hot(' ---|'),
baz: hot(' --1--2--3--|'),
});
const expected = '---#';

expectObservable(e1).toBe(expected, null, new EmptyError());
});
});
});
});
7 changes: 5 additions & 2 deletions src/internal/observable/forkJoin.ts
Expand Up @@ -7,6 +7,7 @@ import { operate } from '../Subscriber';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
import { createObject } from '../util/createObject';
import { AnyCatcher } from '../AnyCatcher';
import { EmptyError } from '../util/EmptyError';

// forkJoin(any)
// We put this first because we need to catch cases where the user has supplied
Expand Down Expand Up @@ -169,10 +170,12 @@ export function forkJoin(...args: any[]): Observable<any> {
complete: () => remainingCompletions--,
finalize: () => {
if (!remainingCompletions || !hasValue) {
if (!remainingEmissions) {
if (remainingEmissions === 0) {
destination.next(keys ? createObject(keys, values) : values);
destination.complete();
} else {
destination.error(new EmptyError());
}
destination.complete();
}
},
})
Expand Down