diff --git a/projects/s-rxjs-utils/src/lib/operators/filter-behavior.spec.ts b/projects/s-rxjs-utils/src/lib/operators/filter-behavior.spec.ts index 90dea2f..0845a53 100644 --- a/projects/s-rxjs-utils/src/lib/operators/filter-behavior.spec.ts +++ b/projects/s-rxjs-utils/src/lib/operators/filter-behavior.spec.ts @@ -1,4 +1,5 @@ -import { Subject } from "rxjs"; +import { BehaviorSubject, Subject } from "rxjs"; +import { catchError } from "rxjs/operators"; import { subscribeWithStubs, testCompletionPropagation, @@ -57,6 +58,38 @@ describe("filterBehavior()", () => { sub3.expectReceivedOnlyValue(7); }); + it("handles the predicate throwing an error", () => { + const ex = new Error(); + const thrower = () => { + throw ex; + }; + const source = new Subject(); + const sub1 = subscribeWithStubs(source.pipe(filterBehavior(thrower))); + const sub2 = subscribeWithStubs(source.pipe(filterBehavior(thrower))); + const sub3 = subscribeWithStubs( + source.pipe( + filterBehavior(thrower), + catchError(() => new BehaviorSubject(-1)), + ), + ); + + expect(source.observers.length).toBe(3); + sub1.expectNoCalls(); + sub2.expectNoCalls(); + sub3.expectNoCalls(); + + source.next(1); + sub1.expectReceivedOnlyValue(1); + sub2.expectReceivedOnlyValue(1); + sub3.expectReceivedOnlyValue(1); + + source.next(2); + expect(source.observers.length).toBe(0); + sub1.expectReceivedOnlyError(ex); + sub2.expectReceivedOnlyError(ex); + sub3.expectReceivedOnlyValue(-1); + }); + it("passes along unsubscribes", () => { testUnsubscribePropagation(() => filterBehavior(() => true)); }); diff --git a/projects/s-rxjs-utils/src/lib/operators/filter-behavior.ts b/projects/s-rxjs-utils/src/lib/operators/filter-behavior.ts index 6a75b0a..9df42d9 100644 --- a/projects/s-rxjs-utils/src/lib/operators/filter-behavior.ts +++ b/projects/s-rxjs-utils/src/lib/operators/filter-behavior.ts @@ -15,9 +15,18 @@ export function filterBehavior(predicate: Predicate) { return createOperatorFunction((subscriber, destination) => { let firstValue = true; subscriber.next = (value) => { - if (firstValue || predicate(value)) { + if (firstValue) { destination.next(value); firstValue = false; + return; + } + + try { + if (predicate(value)) { + destination.next(value); + } + } catch (ex) { + destination.error(ex); } }; }); diff --git a/projects/s-rxjs-utils/src/lib/operators/map-and-cache-array-elements.spec.ts b/projects/s-rxjs-utils/src/lib/operators/map-and-cache-array-elements.spec.ts index c8e613c..aaa829a 100644 --- a/projects/s-rxjs-utils/src/lib/operators/map-and-cache-array-elements.spec.ts +++ b/projects/s-rxjs-utils/src/lib/operators/map-and-cache-array-elements.spec.ts @@ -6,6 +6,7 @@ import { testCompletionPropagation, testErrorPropagation, testUnsubscribePropagation, + testUserFunctionError, } from "../../test-helpers/misc-helpers"; import { mapAndCacheArrayElements } from "./map-and-cache-array-elements"; @@ -102,6 +103,20 @@ describe("mapAndCacheArrayElements()", () => { } }); + it("handles `buildCacheKey` throwing an error", () => { + testUserFunctionError( + (thrower) => mapAndCacheArrayElements(thrower, identity), + [1], + ); + }); + + it("handles `buildDownstreamType` throwing an error", () => { + testUserFunctionError( + (thrower) => mapAndCacheArrayElements(identity, thrower), + [1], + ); + }); + it("passes along unsubscribes", () => { testUnsubscribePropagation(() => mapAndCacheArrayElements(identity, identity), diff --git a/projects/s-rxjs-utils/src/lib/operators/map-and-cache-object-elements.spec.ts b/projects/s-rxjs-utils/src/lib/operators/map-and-cache-object-elements.spec.ts index 0dbabf5..083835b 100644 --- a/projects/s-rxjs-utils/src/lib/operators/map-and-cache-object-elements.spec.ts +++ b/projects/s-rxjs-utils/src/lib/operators/map-and-cache-object-elements.spec.ts @@ -6,7 +6,9 @@ import { testCompletionPropagation, testErrorPropagation, testUnsubscribePropagation, + testUserFunctionError, } from "../../test-helpers/misc-helpers"; +import { mapAndCacheArrayElements } from "./map-and-cache-array-elements"; import { mapAndCacheObjectElements } from "./map-and-cache-object-elements"; describe("mapAndCacheObjectElements()", () => { @@ -102,6 +104,20 @@ describe("mapAndCacheObjectElements()", () => { } }); + it("handles `buildCacheKey` throwing an error", () => { + testUserFunctionError( + (thrower) => mapAndCacheArrayElements(thrower, identity), + { a: 1 }, + ); + }); + + it("handles `buildDownstreamType` throwing an error", () => { + testUserFunctionError( + (thrower) => mapAndCacheArrayElements(identity, thrower), + { a: 1 }, + ); + }); + it("passes along unsubscribes", () => { testUnsubscribePropagation(() => mapAndCacheObjectElements(identity, identity), diff --git a/projects/s-rxjs-utils/src/test-helpers/misc-helpers.ts b/projects/s-rxjs-utils/src/test-helpers/misc-helpers.ts index a11f90f..72595d7 100644 --- a/projects/s-rxjs-utils/src/test-helpers/misc-helpers.ts +++ b/projects/s-rxjs-utils/src/test-helpers/misc-helpers.ts @@ -1,5 +1,11 @@ -import { Observable, of, OperatorFunction, Subject } from "rxjs"; -import { toArray } from "rxjs/operators"; +import { + BehaviorSubject, + Observable, + of, + OperatorFunction, + Subject, +} from "rxjs"; +import { catchError, toArray } from "rxjs/operators"; import { StubbedSubscriber } from "./stubbed-subscriber"; export async function expectPipeResult( @@ -22,47 +28,36 @@ export function pipeAndCollect( .toPromise(); } -// export function testUserFunctionError( -// buildOperator: (userFn: () => never) => OperatorFunction, -// ) { -// const ex = new Error(); -// const thrower = () => { -// throw ex; -// }; -// const source = new Subject(); -// const next1 = jasmine.createSpy(); -// const next2 = jasmine.createSpy(); -// const next3 = jasmine.createSpy(); -// const error1 = jasmine.createSpy(); -// const error2 = jasmine.createSpy(); -// const error3 = jasmine.createSpy(); -// source.pipe(buildOperator(thrower)).subscribe(next1, error1); -// source.pipe(buildOperator(thrower)).subscribe(next2, error2); -// source -// .pipe( -// buildOperator(thrower), -// catchError(() => of(1)), -// ) -// .subscribe(next3, error3); -// -// expect(source.observers.length).toBe(3); -// expect(next1).not.toHaveBeenCalled(); -// expect(next2).not.toHaveBeenCalled(); -// expect(next3).not.toHaveBeenCalled(); -// expect(error1).not.toHaveBeenCalled(); -// expect(error2).not.toHaveBeenCalled(); -// expect(error3).not.toHaveBeenCalled(); -// -// source.next(1); -// -// expect(source.observers.length).toBe(0); -// expect(next1).not.toHaveBeenCalled(); -// expect(next2).not.toHaveBeenCalled(); -// expectSingleCallAndReset(next3, 1); -// expectSingleCallAndReset(error1, ex); -// expectSingleCallAndReset(error2, ex); -// expect(error3).not.toHaveBeenCalled(); -// } +export function testUserFunctionError( + buildOperator: (thrower: () => never) => OperatorFunction, + upstreamValue: any = 1, +) { + const ex = new Error(); + const thrower = () => { + throw ex; + }; + const source = new Subject(); + const sub1 = subscribeWithStubs(source.pipe(buildOperator(thrower))); + const sub2 = subscribeWithStubs(source.pipe(buildOperator(thrower))); + const sub3 = subscribeWithStubs( + source.pipe( + buildOperator(thrower), + catchError(() => new BehaviorSubject(-1)), + ), + ); + + expect(source.observers.length).toBe(3); + sub1.expectNoCalls(); + sub2.expectNoCalls(); + sub3.expectNoCalls(); + + source.next(upstreamValue); + + expect(source.observers.length).toBe(0); + sub1.expectReceivedOnlyError(ex); + sub2.expectReceivedOnlyError(ex); + sub3.expectReceivedOnlyValue(-1); +} export function testUnsubscribePropagation( buildOperator: () => OperatorFunction,