Skip to content

Commit

Permalink
feat: improve handling of errors thrown by passed-in functions
Browse files Browse the repository at this point in the history
  • Loading branch information
ersimont committed Jun 5, 2019
1 parent b5851cb commit 954bf5b
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 45 deletions.
35 changes: 34 additions & 1 deletion projects/s-rxjs-utils/src/lib/operators/filter-behavior.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Subject } from "rxjs";
import { BehaviorSubject, Subject } from "rxjs";
import { catchError } from "rxjs/operators";
import {
subscribeWithStubs,
testCompletionPropagation,
Expand Down Expand Up @@ -57,6 +58,38 @@ describe("filterBehavior()", () => {
sub3.expectReceivedOnlyValue(7);
});

it("handles the predicate throwing an error", () => {
const ex = new Error();
const thrower = () => {
throw ex;
};
const source = new Subject();
const sub1 = subscribeWithStubs(source.pipe(filterBehavior(thrower)));
const sub2 = subscribeWithStubs(source.pipe(filterBehavior(thrower)));
const sub3 = subscribeWithStubs(
source.pipe(
filterBehavior(thrower),
catchError(() => new BehaviorSubject(-1)),
),
);

expect(source.observers.length).toBe(3);
sub1.expectNoCalls();
sub2.expectNoCalls();
sub3.expectNoCalls();

source.next(1);
sub1.expectReceivedOnlyValue(1);
sub2.expectReceivedOnlyValue(1);
sub3.expectReceivedOnlyValue(1);

source.next(2);
expect(source.observers.length).toBe(0);
sub1.expectReceivedOnlyError(ex);
sub2.expectReceivedOnlyError(ex);
sub3.expectReceivedOnlyValue(-1);
});

it("passes along unsubscribes", () => {
testUnsubscribePropagation(() => filterBehavior(() => true));
});
Expand Down
11 changes: 10 additions & 1 deletion projects/s-rxjs-utils/src/lib/operators/filter-behavior.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@ export function filterBehavior<T>(predicate: Predicate<T>) {
return createOperatorFunction<T>((subscriber, destination) => {
let firstValue = true;
subscriber.next = (value) => {
if (firstValue || predicate(value)) {
if (firstValue) {
destination.next(value);
firstValue = false;
return;
}

try {
if (predicate(value)) {
destination.next(value);
}
} catch (ex) {
destination.error(ex);
}
};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
testCompletionPropagation,
testErrorPropagation,
testUnsubscribePropagation,
testUserFunctionError,
} from "../../test-helpers/misc-helpers";
import { mapAndCacheArrayElements } from "./map-and-cache-array-elements";

Expand Down Expand Up @@ -102,6 +103,20 @@ describe("mapAndCacheArrayElements()", () => {
}
});

it("handles `buildCacheKey` throwing an error", () => {
testUserFunctionError(
(thrower) => mapAndCacheArrayElements(thrower, identity),
[1],
);
});

it("handles `buildDownstreamType` throwing an error", () => {
testUserFunctionError(
(thrower) => mapAndCacheArrayElements(identity, thrower),
[1],
);
});

it("passes along unsubscribes", () => {
testUnsubscribePropagation(() =>
mapAndCacheArrayElements(identity, identity),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import {
testCompletionPropagation,
testErrorPropagation,
testUnsubscribePropagation,
testUserFunctionError,
} from "../../test-helpers/misc-helpers";
import { mapAndCacheArrayElements } from "./map-and-cache-array-elements";
import { mapAndCacheObjectElements } from "./map-and-cache-object-elements";

describe("mapAndCacheObjectElements()", () => {
Expand Down Expand Up @@ -102,6 +104,20 @@ describe("mapAndCacheObjectElements()", () => {
}
});

it("handles `buildCacheKey` throwing an error", () => {
testUserFunctionError(
(thrower) => mapAndCacheArrayElements(thrower, identity),
{ a: 1 },
);
});

it("handles `buildDownstreamType` throwing an error", () => {
testUserFunctionError(
(thrower) => mapAndCacheArrayElements(identity, thrower),
{ a: 1 },
);
});

it("passes along unsubscribes", () => {
testUnsubscribePropagation(() =>
mapAndCacheObjectElements(identity, identity),
Expand Down
81 changes: 38 additions & 43 deletions projects/s-rxjs-utils/src/test-helpers/misc-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { Observable, of, OperatorFunction, Subject } from "rxjs";
import { toArray } from "rxjs/operators";
import {
BehaviorSubject,
Observable,
of,
OperatorFunction,
Subject,
} from "rxjs";
import { catchError, toArray } from "rxjs/operators";
import { StubbedSubscriber } from "./stubbed-subscriber";

export async function expectPipeResult<I, O>(
Expand All @@ -22,47 +28,36 @@ export function pipeAndCollect<I, O>(
.toPromise();
}

// export function testUserFunctionError(
// buildOperator: (userFn: () => never) => OperatorFunction<any, any>,
// ) {
// const ex = new Error();
// const thrower = () => {
// throw ex;
// };
// const source = new Subject();
// const next1 = jasmine.createSpy();
// const next2 = jasmine.createSpy();
// const next3 = jasmine.createSpy();
// const error1 = jasmine.createSpy();
// const error2 = jasmine.createSpy();
// const error3 = jasmine.createSpy();
// source.pipe(buildOperator(thrower)).subscribe(next1, error1);
// source.pipe(buildOperator(thrower)).subscribe(next2, error2);
// source
// .pipe(
// buildOperator(thrower),
// catchError(() => of(1)),
// )
// .subscribe(next3, error3);
//
// expect(source.observers.length).toBe(3);
// expect(next1).not.toHaveBeenCalled();
// expect(next2).not.toHaveBeenCalled();
// expect(next3).not.toHaveBeenCalled();
// expect(error1).not.toHaveBeenCalled();
// expect(error2).not.toHaveBeenCalled();
// expect(error3).not.toHaveBeenCalled();
//
// source.next(1);
//
// expect(source.observers.length).toBe(0);
// expect(next1).not.toHaveBeenCalled();
// expect(next2).not.toHaveBeenCalled();
// expectSingleCallAndReset(next3, 1);
// expectSingleCallAndReset(error1, ex);
// expectSingleCallAndReset(error2, ex);
// expect(error3).not.toHaveBeenCalled();
// }
export function testUserFunctionError(
buildOperator: (thrower: () => never) => OperatorFunction<any, any>,
upstreamValue: any = 1,
) {
const ex = new Error();
const thrower = () => {
throw ex;
};
const source = new Subject();
const sub1 = subscribeWithStubs(source.pipe(buildOperator(thrower)));
const sub2 = subscribeWithStubs(source.pipe(buildOperator(thrower)));
const sub3 = subscribeWithStubs(
source.pipe(
buildOperator(thrower),
catchError(() => new BehaviorSubject(-1)),
),
);

expect(source.observers.length).toBe(3);
sub1.expectNoCalls();
sub2.expectNoCalls();
sub3.expectNoCalls();

source.next(upstreamValue);

expect(source.observers.length).toBe(0);
sub1.expectReceivedOnlyError(ex);
sub2.expectReceivedOnlyError(ex);
sub3.expectReceivedOnlyValue(-1);
}

export function testUnsubscribePropagation(
buildOperator: () => OperatorFunction<any, any>,
Expand Down

0 comments on commit 954bf5b

Please sign in to comment.