Skip to content

Commit

Permalink
refactor(tap): Smaller implementation
Browse files Browse the repository at this point in the history
Also optimizes dynamic calls of tap that might not pass any handlers or observers.
  • Loading branch information
benlesh committed Sep 23, 2020
1 parent 1b07686 commit 1222d5a
Showing 1 changed file with 36 additions and 70 deletions.
106 changes: 36 additions & 70 deletions src/internal/operators/tap.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, PartialObserver } from '../types';
import { noop } from '../util/noop';
import { isFunction } from '../util/isFunction';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { identity } from '../util/identity';

/* tslint:disable:max-line-length */
/** @deprecated Use an observer instead of a complete callback */
Expand Down Expand Up @@ -101,81 +104,44 @@ export function tap<T>(observer: PartialObserver<T>): MonoTypeOperatorFunction<T
* @see {@link finalize}
* @see {@link Observable#subscribe}
*
* @param nextOrObserver A next handler or partial observer
* @param observerOrNext A next handler or partial observer
* @param error An error handler
* @param complete A completion handler
*/
export function tap<T>(
nextOrObserver?: PartialObserver<T> | ((x: T) => void) | null,
observerOrNext?: PartialObserver<T> | ((value: T) => void) | null,
error?: ((e: any) => void) | null,
complete?: (() => void) | null
): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let onNext: (value: T) => void;
let onError: (err: any) => void;
let onComplete: () => void;
// We have to check to see not only if next is a function,
// but if error or complete were passed. This is because someone
// could technically call tap like `tap(null, fn)` or `tap(null, null, fn)`.
const tapObserver =
isFunction(observerOrNext) || error || complete ? { next: observerOrNext as (value: T) => void, error, complete } : observerOrNext;

/**
* A helper to ensure that errors thrown in handlers get
* caught and sent do the consumer as an error notification.
*/
const wrap = (fn: any) => (arg?: any) => {
try {
fn(arg);
} catch (err) {
subscriber.error(err);
}
};

if (!nextOrObserver || typeof nextOrObserver === 'function') {
// We have callback functions (or maybe nothing?)

// Bind the next observer to the subscriber. This is an undocumented legacy behavior
// We want to deprecate, but it technically allows for users to call `this.unsubscribe()`
// in the next callback. Again, this is a deprecated, undocumented behavior and we
// do not want to allow this in upcoming versions.
onNext = nextOrObserver ? wrap(nextOrObserver.bind(subscriber)) : noop;

// We don't need to bind the other two callbacks if they exist. There is nothing
// relevant on the subscriber to call during an error or complete callback, as
// it is about to unsubscribe.
onError = error ? wrap(error) : noop;
onComplete = complete ? wrap(complete) : noop;
} else {
// We recieved a partial observer. Make sure the handlers are bound to their
// original parent, and wrap them with the appropriate error handling.
const { next, error, complete } = nextOrObserver;
onNext = next ? wrap(next.bind(nextOrObserver)) : noop;
onError = error ? wrap(error.bind(nextOrObserver)) : noop;
onComplete = complete ? wrap(complete.bind(nextOrObserver)) : noop;
}

source.subscribe(new TapSubscriber(subscriber, onNext, onError, onComplete));
});
}

class TapSubscriber<T> extends Subscriber<T> {
constructor(
destination: Subscriber<T>,
private onNext: (value: T) => void,
private onError: (err: any) => void,
private onComplete: () => void
) {
super(destination);
}

protected _next(value: T) {
this.onNext(value);
super._next(value);
}

protected _error(err: any) {
this.onError(err);
super._error(err);
}

protected _complete() {
this.onComplete();
super._complete();
}
// TODO: Use `operate` function once this PR lands: https://github.com/ReactiveX/rxjs/pull/5742
return tapObserver
? operate((source, subscriber) => {
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
tapObserver.next?.(value);
subscriber.next(value);
},
(err) => {
tapObserver.error?.(err);
subscriber.error(err);
},
() => {
tapObserver.complete?.();
subscriber.complete();
}
)
);
})
: // Tap was called with no valid tap observer or handler
// (e.g. `tap(null, null, null)` or `tap(null)` or `tap()`)
// so we're going to just mirror the source.
identity;
}

0 comments on commit 1222d5a

Please sign in to comment.