Skip to content

Commit

Permalink
fix: errors thrown from iterables now properly propagated (#5444)
Browse files Browse the repository at this point in the history
* fix: errors thrown from iterables now properly propagated

* chore: fix lint failures

Co-authored-by: Nicholas Jamieson <nicholas@cartant.com>
  • Loading branch information
benlesh and cartant committed May 18, 2020
1 parent 796c889 commit 2bce0e3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 13 deletions.
43 changes: 32 additions & 11 deletions spec/operators/mergeMap-spec.ts
@@ -1,14 +1,22 @@
import { expect } from 'chai';
import { mergeMap, map } from 'rxjs/operators';
import { mergeMap, map, delay } from 'rxjs/operators';
import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { asInteropObservable } from '../helpers/interop-helper';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

declare const type: Function;
declare const asDiagram: Function;

/** @test {mergeMap} */
describe('mergeMap', () => {
let rxTest: TestScheduler;

// TODO: Convert the rest of these tests to use run mode!
beforeEach(() => {
rxTest = new TestScheduler(observableMatcher);
});

asDiagram('mergeMap(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014| )')
('should map-and-flatten each item to an Observable', () => {
const e1 = hot('--1-----3--5-------|');
Expand Down Expand Up @@ -821,14 +829,27 @@ describe('mergeMap', () => {
}, 0);
});

type('should support type signatures', () => {
let o: Observable<number>;

/* tslint:disable:no-unused-variable */
let a1: Observable<string> = o.pipe(mergeMap(x => x.toString()));
let a2: Observable<string> = o.pipe(mergeMap(x => x.toString(), 3));
let a3: Observable<{ o: number; i: string; }> = o.pipe(mergeMap(x => x.toString(), (o, i) => ({ o, i })));
let a4: Observable<{ o: number; i: string; }> = o.pipe(mergeMap(x => x.toString(), (o, i) => ({ o, i }), 3));
/* tslint:enable:no-unused-variable */
// NOTE: From https://github.com/ReactiveX/rxjs/issues/5436
it('should properly handle errors from iterables that are processed after some async', () => {
rxTest.run(({ cold, expectObservable }) => {
const noXError = new Error('we do not allow x');
const source = cold('-----A------------B-----|', { A: ['o', 'o', 'o'], B: ['o', 'x', 'o']});
const expected = ' -----(ooo)--------(o#)';
const iterable = function* (data: string[]) {
for (let d of data) {
if (d === 'x') {
throw noXError;
}
yield d;
}
};
const result = source.pipe(
mergeMap(x => of(x).pipe(
delay(0),
mergeMap(iterable)
))
);
expectObservable(result).toBe(expected, undefined, noXError);
});
});
});
22 changes: 22 additions & 0 deletions spec/util/subscribeToResult-spec.ts
Expand Up @@ -119,6 +119,28 @@ describe('subscribeToResult', () => {
expect(expected).to.be.equal(42);
});

// NOTE: From https://github.com/ReactiveX/rxjs/issues/5436
it('should pass along errors from an iterable', () => {
const generator = function* () {
yield 1;
yield 2;
yield 3;
throw 'bad';
};

const results: any[] = [];
let foundError: any = null;

const subscriber = new OuterSubscriber({
next: x => results.push(x),
error: err => foundError = err
});

subscribeToResult(subscriber, generator());
expect(results).to.deep.equal([1, 2, 3]);
expect(foundError).to.equal('bad');
});

it('should subscribe to to an object that implements Symbol.observable', (done) => {
const observableSymbolObject = { [$$symbolObservable]: () => of(42) };

Expand Down
11 changes: 9 additions & 2 deletions src/internal/util/subscribeToIterable.ts
Expand Up @@ -2,9 +2,16 @@ import { Subscriber } from '../Subscriber';
import { iterator as Symbol_iterator } from '../symbol/iterator';

export const subscribeToIterable = <T>(iterable: Iterable<T>) => (subscriber: Subscriber<T>) => {
const iterator = iterable[Symbol_iterator]();
const iterator = (iterable as any)[Symbol_iterator]();

do {
const item = iterator.next();
let item: IteratorResult<T>;
try {
item = iterator.next();
} catch (err) {
subscriber.error(err);
return;
}
if (item.done) {
subscriber.complete();
break;
Expand Down

0 comments on commit 2bce0e3

Please sign in to comment.