Skip to content

Commit

Permalink
feat: add filterBehavior()
Browse files Browse the repository at this point in the history
  • Loading branch information
ersimont committed Apr 3, 2019
1 parent 35857d7 commit 5d4cb15
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 32 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Simonton Software Typescript Utils

S-rxjs-utils is one library in a suite that is available from Simonton Software. Each one builds on the last, organized by their dependencies:
`s-rxjs-utils` is one library in a suite that is available from Simonton Software. Each one builds on the last, organized by their dependencies:

1. [`micro-dash`](https://github.com/simontonsoftware/micro-dash): A much smaller Lodash
1. [`s-js-utils`](https://github.com/simontonsoftware/s-js-utils): Miscellaneous utilities written in TypeScript
Expand Down
33 changes: 16 additions & 17 deletions projects/s-rxjs-utils/src/lib/cache.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BehaviorSubject, Subject } from "rxjs";
import { tap } from "rxjs/operators";
import { expectCallAndReset } from "../test-helpers";
import { cache } from "./cache";

describe("cache()", () => {
Expand All @@ -16,8 +17,8 @@ describe("cache()", () => {
cached.subscribe(next1);
cached.subscribe(next2);

expect(next1).toHaveBeenCalledWith(value);
expect(next2).toHaveBeenCalledWith(value);
expectCallAndReset(next1, value);
expectCallAndReset(next2, value);
});

it("does not run upstream pipe operators for new subscribers", () => {
Expand Down Expand Up @@ -45,16 +46,15 @@ describe("cache()", () => {
cached.subscribe();
cached.subscribe();

expect(upstream).toHaveBeenCalledWith(1);
expect(upstream).toHaveBeenCalledTimes(1);
expectCallAndReset(upstream, 1);
});

it("unsubscribes from the upstream observable", () => {
const source = new Subject();
const cached = source.pipe(cache());
const cached$ = source.pipe(cache());

const sub1 = cached.subscribe();
const sub2 = cached.subscribe();
const sub1 = cached$.subscribe();
const sub2 = cached$.subscribe();
expect(source.observers.length).toBe(1);

sub1.unsubscribe();
Expand All @@ -71,20 +71,19 @@ describe("cache()", () => {
const next2 = jasmine.createSpy();

const sub = cached.subscribe(next1);
expect(next1).toHaveBeenCalledWith(1);
expectCallAndReset(next1, 1);
sub.unsubscribe();

expect(source.observers.length).toBe(0);
source.next(2);

cached.subscribe(next2);
expect(next1).not.toHaveBeenCalledWith(2);
expect(next2).not.toHaveBeenCalledWith(1);
expect(next2).toHaveBeenCalledWith(2);
expect(next1).not.toHaveBeenCalled();
expectCallAndReset(next2, 2);

source.next(3);
expect(next1).not.toHaveBeenCalledWith(3);
expect(next2).toHaveBeenCalledWith(3);
expect(next1).not.toHaveBeenCalled();
expectCallAndReset(next2, 3);
});

it("passes along errors", () => {
Expand All @@ -98,8 +97,8 @@ describe("cache()", () => {
cached.subscribe(undefined, error2);
source.error(err);

expect(error1).toHaveBeenCalledWith(err);
expect(error2).toHaveBeenCalledWith(err);
expectCallAndReset(error1, err);
expectCallAndReset(error2, err);
});

it("passes along completion", () => {
Expand All @@ -112,7 +111,7 @@ describe("cache()", () => {
cached.subscribe(undefined, undefined, complete2);
source.complete();

expect(complete1).toHaveBeenCalledWith();
expect(complete2).toHaveBeenCalledWith();
expectCallAndReset(complete1);
expectCallAndReset(complete2);
});
});
4 changes: 2 additions & 2 deletions projects/s-rxjs-utils/src/lib/create-pipeable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { SubscriptionManager } from "./subscription-manager";
*
* A simple example, recreating the "map" operator:
* ```ts
* function map<I, O>(fn: (input: I) => O) {
* return createPipeable<I, O>(
* function map<U, D>(fn: (input: U) => D) {
* return createPipeable<U, D>(
* (upstream, downstream, subscriptionManager) => {
* subscriptionManager.subscribeTo(
* upstream,
Expand Down
102 changes: 102 additions & 0 deletions projects/s-rxjs-utils/src/lib/filter-behavior.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { Subject } from "rxjs";
import { expectCallAndReset } from "../test-helpers";
import { filterBehavior } from "./filter-behavior";

describe("filterBehavior()", () => {
it("filters items based on the supplied predicate", () => {
const source = new Subject();
const predicate = jasmine.createSpy();
const next = jasmine.createSpy();
source.pipe(filterBehavior(predicate)).subscribe(next);

source.next(1);
expectCallAndReset(next, 1);

predicate.and.returnValue(true);
source.next(2);
expectCallAndReset(next, 2);
source.next(3);
expectCallAndReset(next, 3);

predicate.and.returnValue(false);
source.next(4);
source.next(5);
expect(next).not.toHaveBeenCalled();

predicate.and.returnValue(true);
source.next(6);
expectCallAndReset(next, 6);
});

it("emits the first value unconditionally for each subscriber", () => {
const source = new Subject();
const predicate = jasmine.createSpy().and.returnValue(false);
const filtered$ = source.pipe(filterBehavior(predicate));

const next1 = jasmine.createSpy();
filtered$.subscribe(next1);
source.next(1);
source.next(2);
source.next(3);

const next2 = jasmine.createSpy();
filtered$.subscribe(next2);
source.next(4);
source.next(5);
source.next(6);

const next3 = jasmine.createSpy();
filtered$.subscribe(next3);
source.next(7);
source.next(8);
source.next(9);

expectCallAndReset(next1, 1);
expectCallAndReset(next2, 4);
expectCallAndReset(next3, 7);
});

it("unsubscribes from upstream observable", () => {
const source = new Subject();
const filtered$ = source.pipe(filterBehavior(() => true));

const sub1 = filtered$.subscribe();
const sub2 = filtered$.subscribe();
expect(source.observers.length).toBe(2);

sub1.unsubscribe();
expect(source.observers.length).toBe(1);

sub2.unsubscribe();
expect(source.observers.length).toBe(0);
});

it("passes along errors", () => {
const source = new Subject();
const cached = source.pipe(filterBehavior(() => false));
const error1 = jasmine.createSpy();
const error2 = jasmine.createSpy();
const err = Symbol();

cached.subscribe(undefined, error1);
cached.subscribe(undefined, error2);
source.error(err);

expect(error1).toHaveBeenCalledWith(err);
expect(error2).toHaveBeenCalledWith(err);
});

it("passes along completion", () => {
const source = new Subject();
const cached = source.pipe(filterBehavior(() => false));
const complete1 = jasmine.createSpy();
const complete2 = jasmine.createSpy();

cached.subscribe(undefined, undefined, complete1);
cached.subscribe(undefined, undefined, complete2);
source.complete();

expectCallAndReset(complete1);
expectCallAndReset(complete2);
});
});
31 changes: 31 additions & 0 deletions projects/s-rxjs-utils/src/lib/filter-behavior.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { MonoTypeOperatorFunction } from "rxjs";
import { createPipeable } from "./create-pipeable";

/**
* Works like `filter()`, but always lets through the first emission for each new subscriber. This makes it suitable for subscribers that expect the observable to behave like a `BehaviorSubject`, where the first emission is processed synchronously during the call to `subscribe()` (such as the `async` pipe in an Angular template).
*
* ```
* source: |-false--true--false--true--false--true-|
* filterBehavior(identity): |-false--true---------true---------true-|
* filterBehavior(identity): |-true---------true---------true-|
* filterBehavior(identity): |-false--true---------true-|
* ```
*/
export function filterBehavior<T>(
predicate: (value: T) => boolean,
): MonoTypeOperatorFunction<T> {
return createPipeable<T, T>((upstream$, downstream, manager) => {
let firstValue = true;
manager.subscribeTo(
upstream$,
(value) => {
if (firstValue || predicate(value)) {
downstream.next(value);
}
firstValue = false;
},
downstream.error.bind(downstream),
downstream.complete.bind(downstream),
);
});
}
20 changes: 8 additions & 12 deletions projects/s-rxjs-utils/src/lib/skip-after.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Subject } from "rxjs";
import { expectCallAndReset } from "../test-helpers";
import { skipAfter } from "./skip-after";

describe("skipAfter()", () => {
Expand All @@ -9,16 +10,14 @@ describe("skipAfter()", () => {
source.pipe(skipAfter(skip$)).subscribe(next);

source.next(1);
expect(next).toHaveBeenCalledTimes(1);
expect(next).toHaveBeenCalledWith(1);
expectCallAndReset(next, 1);

skip$.next();
source.next(2);
expect(next).toHaveBeenCalledTimes(1);
expect(next).not.toHaveBeenCalled();

source.next(3);
expect(next).toHaveBeenCalledTimes(2);
expect(next).toHaveBeenCalledWith(3);
expectCallAndReset(next, 3);
});

it("only skips one emission even if called multiple times", () => {
Expand All @@ -28,17 +27,15 @@ describe("skipAfter()", () => {
source.pipe(skipAfter(skip$)).subscribe(next);

source.next(1);
expect(next).toHaveBeenCalledTimes(1);
expect(next).toHaveBeenCalledWith(1);
expectCallAndReset(next, 1);

skip$.next();
skip$.next();
source.next(2);
expect(next).toHaveBeenCalledTimes(1);
expect(next).not.toHaveBeenCalled();

source.next(3);
expect(next).toHaveBeenCalledTimes(2);
expect(next).toHaveBeenCalledWith(3);
expectCallAndReset(next, 3);
});

it("handles completions", () => {
Expand Down Expand Up @@ -72,8 +69,7 @@ describe("skipAfter()", () => {

expect(skip$.observers.length).toBe(0);
expect(upstream$.observers.length).toBe(0);
expect(error).toHaveBeenCalledTimes(1);
expect(error).toHaveBeenCalledWith("the error");
expectCallAndReset(error, "the error");
});

it("handles unsubscribes", () => {
Expand Down
1 change: 1 addition & 0 deletions projects/s-rxjs-utils/src/public_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

export { cache } from "./lib/cache";
export { createPipeable } from "./lib/create-pipeable";
export { filterBehavior } from "./lib/filter-behavior";
export { skipAfter } from "./lib/skip-after";
export { SubscriptionManager } from "./lib/subscription-manager";
export { withHistory } from "./lib/with-history";
7 changes: 7 additions & 0 deletions projects/s-rxjs-utils/src/test-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,10 @@ export function pipeAndCollect<I, O>(
)
.toPromise();
}

// consider for s-js-utils
export function expectCallAndReset(spy: jasmine.Spy, ...params: any[]) {
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith(...params);
spy.calls.reset();
}
2 changes: 2 additions & 0 deletions src/app/app.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Subject } from "rxjs";
import {
cache,
createPipeable,
filterBehavior,
skipAfter,
SubscriptionManager,
withHistory,
Expand All @@ -25,6 +26,7 @@ export class AppComponent {
createPipeable(noop),
skipAfter(new Subject()),
withHistory(3),
filterBehavior(() => true),
),
);

Expand Down

0 comments on commit 5d4cb15

Please sign in to comment.