Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding operate #7264

Merged
merged 65 commits into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
c76d90e
feat(operate): Adds new subscriber chaining mechanism for operator cr…
benlesh May 16, 2023
dcd8ce9
feat(Subscriber): constructing `new Subscriber` is deprecated
benlesh May 16, 2023
59fd2e5
docs(operate): Add documentation.
benlesh May 16, 2023
fb193e4
refactor(catchError): use new `operate` function.
benlesh May 16, 2023
f8eaf6c
refactor(takeLast): Implement with new `operate` function.
benlesh May 16, 2023
45033fc
refactor(audit): use new `operate` function
benlesh May 16, 2023
306c8c9
refactor(buffer): use new `operate` function
benlesh May 16, 2023
8a530a3
refactor(bufferCount): use new `operate` function
benlesh May 16, 2023
9f61b67
refactor(bufferTime): use new `operate` function
benlesh May 16, 2023
74fa752
refactor(bufferToggle): use new `operate` function.
benlesh May 16, 2023
ffa85b4
refactor(bufferWhen): use new `operate` function
benlesh May 16, 2023
2d89d96
refactor(combineLatest): use new `operate` function.
benlesh May 16, 2023
deb5499
refactor(merge/concat): Use new `operate` function.
benlesh May 16, 2023
055a14b
refactor(debounce): use new `operate` function.
benlesh May 16, 2023
aa27830
refactor(debounceTime): use new `operate` function
benlesh May 16, 2023
c80ecad
refactor(defaultIfEmpty): use new `operate` function.
benlesh May 16, 2023
f893852
refactor(dematerialize): use new `operate` function
benlesh May 16, 2023
9bc1396
refactor(distinct): use new `operate` function.
benlesh May 16, 2023
2e3f986
refactor(distinctUntilChanged): use new `operate` function.
benlesh May 16, 2023
43eab50
refactor(endWith): use new `operate` function
benlesh May 16, 2023
bd74d5e
refactor(every): use new `operate` function.
benlesh May 16, 2023
1bb59e0
refactor(exhaustMap): use new `operate` function.
benlesh May 16, 2023
143dd4c
refactor(filter): use new `operate` function.
benlesh May 16, 2023
1f17b25
refactor(find): use new `operate` function
benlesh May 16, 2023
83da1fc
refactor(groupBy): use new `operate` function
benlesh May 16, 2023
7e7f311
refactor(ignoreElements): use new `operate` function.
benlesh May 16, 2023
3c55b14
refactor(isEmpty): use new `operate` function.
benlesh May 16, 2023
c649d92
refactor(materialize): use new `operate` function.
benlesh May 16, 2023
c1d41c5
refactor(observeOn): use new `operate` function.
benlesh May 16, 2023
0a5ee0d
refactor(pairwise): use new `operate` function.
benlesh May 16, 2023
8dcaee0
refactor(race/raceWith): use new `operate` function.
benlesh May 16, 2023
8fa5341
refactor(onErrorResumeNext): use new `operate` function.
benlesh May 16, 2023
27431a7
refactor(reduce/scan): use new `operate` function.
benlesh May 16, 2023
2d35167
refactor(repeat): use new `operate` function.
benlesh May 16, 2023
9cbe7a4
refactor(repeatWhen): use new `operate` function.
benlesh May 16, 2023
d8eb8e5
refactor(retry): use new `operate` function.
benlesh May 16, 2023
94de33e
refactor(retryWhen): use new `operate` function.
benlesh May 16, 2023
28c193e
refactor(sample): use new `operate` function.
benlesh May 16, 2023
c8b77c6
refactor(sequenceEqual): use new `operate` function.
benlesh May 16, 2023
204eede
refactor(single): use new `operate` function.
benlesh May 16, 2023
0230311
refactor(skipLast): use new `operate` function.
benlesh May 16, 2023
6b6f6a8
refactor(skipUntil): use new `operate` function.
benlesh May 16, 2023
631b532
refactor(skipWhile): use new `operate` function.
benlesh May 16, 2023
323f661
refactor(switchMap): use new `operate` function.
benlesh May 16, 2023
f33acd8
refactor(switchScan): use new `operate` function.
benlesh May 16, 2023
134bca4
refactor(take): use new `operate` function.
benlesh May 16, 2023
48a47dd
refactor(takeUntil): use new `operate` function.
benlesh May 16, 2023
e694edf
refactor(takeWhile): use new `operate` function
benlesh May 16, 2023
81a55f0
refactor(tap): use new `operate` function
benlesh May 16, 2023
96ce7ac
refactor(throttle): use new `operate` function
benlesh May 16, 2023
9fcc78d
refactor(throwIfEmpty): use new `operate` function
benlesh May 16, 2023
5d5b05e
refactor(timeInterval): use new `operate` function
benlesh May 16, 2023
4be4be3
refactor(timeout): use new `operate` function
benlesh May 16, 2023
31a8a4b
refactor(window): use new `operate` function.
benlesh May 16, 2023
3e2aac4
refactor(windowCount): use new `operate` function.
benlesh May 16, 2023
688f1d0
refactor(windowTime): use new `operate` function.
benlesh May 16, 2023
6c2b191
refactor(windowToggle): use new `operate` function.
benlesh May 16, 2023
aafc64a
refactor(windowWhen): use new `operate` function.
benlesh May 16, 2023
bc5648f
refactor(withLatestFrom): use new `operate` function.
benlesh May 16, 2023
2aec520
refactor(zip): use new `operate` function.
benlesh May 16, 2023
84ba156
refactor(forkJoin): use new `operate` function.
benlesh May 16, 2023
eef03e2
refactor(fetch): use new `operate` function.
benlesh May 16, 2023
4bee713
test(Subject): update generic operator test to use `operate` function.
benlesh May 16, 2023
e7937c7
refactor: Delete `OperatorSubscriber`, no longer used.
benlesh May 16, 2023
564cedc
chore: Resolve review comments
benlesh May 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions spec-dtslint/Subscriber-spec.ts
@@ -0,0 +1,11 @@
import { Subscriber } from "rxjs";

describe("Subscriber", () => {
it('should have deprecated and internal constructors', () => {
const s1 = new Subscriber(); // $ExpectDeprecation
const s2 = new Subscriber(() => {}); // $ExpectDeprecation
const s3 = new Subscriber({}); // $ExpectDeprecation
const s4 = new Subscriber({ next: () => {}}); // $ExpectDeprecation
const s5 = new Subscriber({ }, { next: () => {} }); // $ExpectError
});
});
20 changes: 12 additions & 8 deletions spec/Subject-spec.ts
@@ -1,10 +1,9 @@
import { expect } from 'chai';
import { Subject, Observable, AsyncSubject, Observer, of, config, Subscription, Subscriber, noop } from 'rxjs';
import { Subject, Observable, AsyncSubject, Observer, of, config, Subscription, Subscriber, noop, operate } from 'rxjs';
import { AnonymousSubject } from 'rxjs/internal/Subject';
import { delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';
import { createOperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber';

/** @test {Subject} */
describe('Subject', () => {
Expand Down Expand Up @@ -729,15 +728,20 @@ describe('Subject', () => {
});
});

it('should behave properly when subscribed to more than once by the same OperatorSubscriber', () => {
it('should behave properly when subscribed to more than once by the same operator subscriber returned by `operate`', () => {
const subject = new Subject<number>();
const destination = new Subscriber();
const results: any[] = [];
const subscriber = createOperatorSubscriber(destination, (value) => {
results.push(value);
}, () => {
results.push('complete');
}, noop);
const subscriber = operate({
destination,
next: (value) => {
results.push(value);
},
error: noop,
complete: () => {
results.push('complete');
},
});

subject.subscribe(subscriber);
subject.subscribe(subscriber);
Expand Down
130 changes: 129 additions & 1 deletion spec/Subscriber-spec.ts
@@ -1,5 +1,6 @@
import { expect } from 'chai';
import { Subscriber, Observable, of, Observer, config } from 'rxjs';
import { Subscriber, Observable, of, Observer, config, operate } from 'rxjs';
import * as sinon from 'sinon';
import { asInteropSubscriber } from './helpers/interop-helper';
import { getRegisteredFinalizers } from './helpers/subscription';

Expand Down Expand Up @@ -310,4 +311,131 @@ describe('Subscriber', () => {
} else {
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
}
});

describe('operate', () => {
it('should create a Subscriber that passes next calls through to the destination by default', () => {
const next = sinon.spy();
const destination = new Subscriber<string>({
next,
});

const subscriber = operate({
destination,
})

subscriber.next('foo');
expect(next).to.have.been.calledOnceWithExactly('foo');
});

it('should catch any errors in the next override, and pass them to the error handler on the destination', () => {
const error = sinon.spy();
const destination = new Subscriber<string>({
error,
});

const subscriber = operate({
destination,
next() {
throw 'boop!'
},
})

subscriber.next('foo');
expect(error).to.have.been.calledOnceWithExactly('boop!');
});

it('should pass errors passed to the result through to the destination error handler by default', () => {
const error = sinon.spy();
const finalizer = sinon.spy();

const destination = new Subscriber<string>({
error,
});
destination.add(finalizer);

const subscriber = operate({
destination,
})

subscriber.error('boop!');
expect(error).to.have.been.calledOnceWithExactly('boop!');
expect(finalizer).to.have.been.calledOnce;
});

it('should pass errors that are thrown by the error override through to the destination, and finalize the subscriber', () => {
const error = sinon.spy();
const finalizer = sinon.spy();

const destination = new Subscriber<string>({
error,
});
destination.add(finalizer);

const subscriber = operate({
destination,
error() {
throw 'boop!'
}
});

subscriber.error('no boop for you');
expect(error).to.have.been.calledOnceWithExactly('boop!');
expect(finalizer).to.have.been.calledOnce;
});

it('should pass complete calls through to the destination by default', () => {
const complete = sinon.spy();
const finalizer = sinon.spy();

const destination = new Subscriber<string>({
complete,
});
destination.add(finalizer);

const subscriber = operate({
destination,
});

subscriber.complete();
expect(complete).to.have.been.calledOnce;
expect(finalizer).to.have.been.calledOnce;
});

it('should catch any errors in the complete override and pass them to the destination complete handler and finalize', () => {
const complete = sinon.spy();
const finalizer = sinon.spy();
const error = sinon.spy();

const destination = new Subscriber<string>({
complete,
error,
});
destination.add(finalizer);

const subscriber = operate({
destination,
complete() {
throw 'boop!'
}
});

subscriber.complete();
expect(complete).not.to.have.been.called;
benlesh marked this conversation as resolved.
Show resolved Hide resolved
expect(finalizer).to.have.been.calledOnce;
expect(error).to.have.been.calledOnceWithExactly('boop!');
});

it('should return a subscriber that, when unsubscribed, will finalize the destination', () => {
const finalizer = sinon.spy();
const destination = new Subscriber<string>({});
destination.add(finalizer);

const subscriber = operate({
destination,
});

destination.unsubscribe();
expect(finalizer).to.have.been.calledOnce;
});
});
3 changes: 2 additions & 1 deletion src/index.ts
Expand Up @@ -35,13 +35,14 @@ export { Scheduler } from './internal/Scheduler';

/* Subscription */
export { Subscription } from './internal/Subscription';
export { Subscriber } from './internal/Subscriber';
export { Subscriber, SubscriberOverrides } from './internal/Subscriber';

/* Utils */
export { pipe } from './internal/util/pipe';
export { noop } from './internal/util/noop';
export { identity } from './internal/util/identity';
export { isObservable } from './internal/util/isObservable';
export { operate } from './internal/Subscriber';

/* Promise Conversion */
export { lastValueFrom } from './internal/lastValueFrom';
Expand Down
140 changes: 137 additions & 3 deletions src/internal/Subscriber.ts
Expand Up @@ -6,6 +6,37 @@ import { reportUnhandledError } from './util/reportUnhandledError';
import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
import { timeoutProvider } from './scheduler/timeoutProvider';

export interface SubscriberOverrides<T> {
/**
* If provided, this function will be called whenever the {@link Subscriber}'s
* `next` method is called, with the value that was passed to that call. If
* an error is thrown within this function, it will be handled and passed to
* the destination's `error` method.
* @param value The value that is being observed from the source.
*/
next?: (value: T) => void;
/**
* If provided, this function will be called whenever the {@link Subscriber}'s
* `error` method is called, with the error that was passed to that call. If
* an error is thrown within this function, it will be handled and passed to
* the destination's `error` method.
* @param err An error that has been thrown by the source observable.
*/
error?: (err: any) => void;
/**
* If provided, this function will be called whenever the {@link Subscriber}'s
* `complete` method is called. If an error is thrown within this function, it
* will be handled and passed to the destination's `error` method.
*/
complete?: () => void;
/**
* If provided, this function will be called after all teardown has occurred
* for this {@link Subscriber}. This is generally used for cleanup purposes
* during operator development.
*/
finalize?: () => void;
}

/**
* Implements the {@link Observer} interface and extends the
* {@link Subscription} class. While the {@link Observer} is the public API for
Expand All @@ -20,23 +51,65 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
/** @internal */
protected destination: Observer<T>;

/** @internal */
protected readonly _nextOverride: ((value: T) => void) | null = null;
/** @internal */
protected readonly _errorOverride: ((err: any) => void) | null = null;
/** @internal */
protected readonly _completeOverride: (() => void) | null = null;
/** @internal */
protected readonly _onFinalize: (() => void) | null = null;

/**
* @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead.
*/
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null);

/**
* @internal
*/
constructor(destination: Subscriber<any> | Partial<Observer<any>> | ((value: any) => void) | null, overrides: SubscriberOverrides<T>);

/**
* Creates an instance of an RxJS Subscriber. This is the workhorse of the library.
*
* If another instance of Subscriber is passed in, it will automatically wire up unsubscription
* between this instnace and the passed in instance.
* between this instance and the passed in instance.
*
* If a partial or full observer is passed in, it will be wrapped and appropriate safeguards will be applied.
*
* If a next-handler function is passed in, it will be wrapped and appropriate safeguards will be applied.
*
* @param destination A subscriber, partial observer, or function that receives the next value.
* @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead.
*/
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null) {
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null, overrides?: SubscriberOverrides<T>) {
super();

// The only way we know that error reporting safety has been applied is if we own it.
this.destination = destination instanceof Subscriber ? destination : createSafeObserver(destination);

this._nextOverride = overrides?.next ?? null;
this._errorOverride = overrides?.error ?? null;
this._completeOverride = overrides?.complete ?? null;
this._onFinalize = overrides?.finalize ?? null;

// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
// order. This will ensure that all Subscriber instances have the
// same hidden class in V8. This, in turn, will help keep the number of
// hidden classes involved in property accesses within the base class as
// low as possible. If the number of hidden classes involved exceeds four,
// the property accesses will become megamorphic and performance penalties
// will be incurred - i.e. inline caches won't be used.
//
// The reasons for ensuring all instances have the same hidden class are
// further discussed in this blog post from Benedikt Meurer:
// https://benediktmeurer.de/2018/03/23/impact-of-polymorphism-on-component-based-frameworks-like-react/
this._next = this._nextOverride ? overrideNext : this._next;
this._error = this._errorOverride ? overrideError : this._error;
this._complete = this._completeOverride ? overrideComplete : this._complete;

// Automatically chain subscriptions together here.
// if destination appears to be one of our subscriptions, we'll chain it.
if (hasAddAndUnsubscribe(destination)) {
Expand Down Expand Up @@ -91,7 +164,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.closed) {
this.isStopped = true;
super.unsubscribe();
this.destination = null!;
this._onFinalize?.();
}
}

Expand All @@ -116,6 +189,34 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
}

function overrideNext<T>(this: Subscriber<T>, value: T): void {
try {
this._nextOverride!(value);
} catch (error) {
this.destination.error(error);
}
}

function overrideError(this: Subscriber<unknown>, err: any): void {
try {
this._errorOverride!(err);
} catch (error) {
this.destination.error(error);
} finally {
this.unsubscribe();
}
}

function overrideComplete(this: Subscriber<unknown>): void {
try {
this._completeOverride!();
} catch (error) {
this.destination.error(error);
} finally {
this.unsubscribe();
}
}

class ConsumerObserver<T> implements Observer<T> {
constructor(private partialObserver: Partial<Observer<T>>) {}

Expand Down Expand Up @@ -172,3 +273,36 @@ function handleStoppedNotification(notification: ObservableNotification<any>, su
function hasAddAndUnsubscribe(value: any): value is Subscription {
return value && isFunction(value.unsubscribe) && isFunction(value.add);
}

export interface OperateConfig<In, Out> extends SubscriberOverrides<In> {
benlesh marked this conversation as resolved.
Show resolved Hide resolved
/**
* The destination subscriber to forward notifications to. This is also the
* subscriber that will receive unhandled errors if your `next`, `error`, or `complete`
* overrides throw.
*/
destination: Subscriber<Out>;
}

/**
* Creates a new {@link Subscriber} instance that passes notifications on to the
* supplied `destination`. The overrides provided in the `config` argument for
* `next`, `error`, and `complete` will be called in such a way that any
* errors are caught and forwarded to the destination's `error` handler. The returned
* `Subscriber` will be "chained" to the `destination` such that when `unsubscribe` is
* called on the `destination`, the returned `Subscriber` will also be unsubscribed.
*
* Advanced: This ensures that subscriptions are properly wired up prior to starting the
* subcription logic. This prevents "synchronous firehose" scenarios where an
* inner observable from a flattening operation cannot be stopped by a downstream
* terminal operator like `take`.
*
* This is a utility designed to be used to create new operators for observables.
*
* For examples, please see our code base.
*
* @param config The configuration for creating a new subscriber for an operator.
* @returns A new subscriber that is chained to the destination.
*/
export function operate<In, Out>({ destination, ...subscriberOverrides }: OperateConfig<In, Out>) {
return new Subscriber(destination, subscriberOverrides);
}