Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: no internal pipe method use #7321

Merged
merged 1 commit into from Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 5 additions & 8 deletions packages/rxjs/src/internal/observable/bindCallbackInternals.ts
@@ -1,10 +1,9 @@
import type { SchedulerLike } from '../types.js';
import { isScheduler } from '../util/isScheduler.js';
import { Observable } from '../Observable.js';
import { subscribeOn } from '../operators/subscribeOn.js';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs.js';
import { observeOn } from '../operators/observeOn.js';
import { AsyncSubject } from '../AsyncSubject.js';
import { scheduled } from '../scheduled/scheduled.js';

export function bindCallbackInternals(
isNodeStyle: boolean,
Expand All @@ -18,9 +17,9 @@ export function bindCallbackInternals(
} else {
// The user provided a result selector.
return function (this: any, ...args: any[]) {
return (bindCallbackInternals(isNodeStyle, callbackFunc, scheduler) as any)
.apply(this, args)
.pipe(mapOneOrManyArgs(resultSelector as any));
return mapOneOrManyArgs(resultSelector as any)(
(bindCallbackInternals(isNodeStyle, callbackFunc, scheduler) as any).apply(this, args)
);
};
}
}
Expand All @@ -29,9 +28,7 @@ export function bindCallbackInternals(
// to compose that behavior for the user.
if (scheduler) {
return function (this: any, ...args: any[]) {
return (bindCallbackInternals(isNodeStyle, callbackFunc) as any)
.apply(this, args)
.pipe(subscribeOn(scheduler!), observeOn(scheduler!));
return scheduled((bindCallbackInternals(isNodeStyle, callbackFunc) as any).apply(this, args), scheduler!);
};
}

Expand Down
2 changes: 1 addition & 1 deletion packages/rxjs/src/internal/observable/dom/fetch.ts
Expand Up @@ -107,7 +107,7 @@ export function fromFetch<T>(
// This flag exists to make sure we don't `abort()` the fetch upon tearing down
// this observable after emitting a Response. Aborting in such circumstances
// would also abort subsequent methods - like `json()` - that could be called
// on the Response. Consider: `fromFetch().pipe(take(1), mergeMap(res => res.json()))`
// on the Response. Consider: `rx(fromFetch(), take(1), mergeMap(res => res.json()))`
let abortable = true;

// If the user provided an init configuration object,
Expand Down
2 changes: 1 addition & 1 deletion packages/rxjs/src/internal/observable/forkJoin.ts
Expand Up @@ -180,5 +180,5 @@ export function forkJoin(...args: any[]): Observable<any> {
);
}
});
return resultSelector ? result.pipe(mapOneOrManyArgs(resultSelector)) : result;
return resultSelector ? mapOneOrManyArgs(resultSelector)(result) : result;
}
4 changes: 2 additions & 2 deletions packages/rxjs/src/internal/observable/fromEvent.ts
@@ -1,4 +1,4 @@
import type { Subscriber} from '../Observable.js';
import type { Subscriber } from '../Observable.js';
import { Observable, isArrayLike, isFunction } from '../Observable.js';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs.js';

Expand Down Expand Up @@ -252,7 +252,7 @@ export function fromEvent<T>(
}

if (resultSelector) {
return fromEvent<T>(target, eventName as string, options as EventListenerOptions).pipe(mapOneOrManyArgs(resultSelector));
return mapOneOrManyArgs(resultSelector)(fromEvent<T>(target, eventName as string, options as EventListenerOptions));
}

const isValidTarget = isNodeStyleEventEmitter(target) || isJQueryStyleEventEmitter(target) || isEventTarget(target);
Expand Down
2 changes: 1 addition & 1 deletion packages/rxjs/src/internal/observable/fromEventPattern.ts
Expand Up @@ -138,7 +138,7 @@ export function fromEventPattern<T>(
resultSelector?: (...args: any[]) => T
): Observable<T | T[]> {
if (resultSelector) {
return fromEventPattern<T>(addHandler, removeHandler).pipe(mapOneOrManyArgs(resultSelector));
return mapOneOrManyArgs(resultSelector)(fromEventPattern<T>(addHandler, removeHandler));
}

return new Observable<T | T[]>((subscriber) => {
Expand Down
17 changes: 12 additions & 5 deletions packages/rxjs/src/internal/operators/delayWhen.ts
@@ -1,11 +1,11 @@
import type { Observable} from '../Observable.js';
import { from } from '../Observable.js';
import type { Observable } from '../Observable.js';
import type { MonoTypeOperatorFunction, ObservableInput } from '../types.js';
import { concat } from '../observable/concat.js';
import { take } from './take.js';
import { ignoreElements } from './ignoreElements.js';
import { mapTo } from './mapTo.js';
import { mergeMap } from './mergeMap.js';
import { rx } from '../util/rx.js';
import { map } from './map.js';

/** @deprecated The `subscriptionDelay` parameter will be removed in v8. */
export function delayWhen<T>(
Expand Down Expand Up @@ -96,8 +96,15 @@ export function delayWhen<T>(
if (subscriptionDelay) {
// DEPRECATED PATH
return (source: Observable<T>) =>
concat(subscriptionDelay.pipe(take(1), ignoreElements()), source.pipe(delayWhen(delayDurationSelector)));
concat(rx(subscriptionDelay, take(1), ignoreElements()), rx(source, delayWhen(delayDurationSelector)));
}

return mergeMap((value, index) => from(delayDurationSelector(value, index)).pipe(take(1), mapTo(value)));
return mergeMap(
(value, index) =>
rx(
delayDurationSelector(value, index),
take(1),
map(() => value)
) as Observable<T>
);
}
35 changes: 26 additions & 9 deletions packages/rxjs/src/internal/operators/elementAt.ts
@@ -1,9 +1,6 @@
import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError.js';
import type { Observable } from '../Observable.js';
import { Observable, operate } from '../Observable.js';
import type { OperatorFunction } from '../types.js';
import { filter } from './filter.js';
import { throwIfEmpty } from './throwIfEmpty.js';
import { defaultIfEmpty } from './defaultIfEmpty.js';
import { take } from './take.js';

/**
Expand Down Expand Up @@ -60,9 +57,29 @@ export function elementAt<T, D = T>(index: number, defaultValue?: D): OperatorFu
}
const hasDefaultValue = arguments.length >= 2;
return (source: Observable<T>) =>
source.pipe(
filter((v, i) => i === index),
take(1),
hasDefaultValue ? defaultIfEmpty(defaultValue!) : throwIfEmpty(() => new ArgumentOutOfRangeError())
);
new Observable((destination) => {
let i = 0;
const operatorSubscriber = operate<T, T | D>({
destination,
next: (value) => {
if (i++ === index) {
// We want to unsubscribe from the source as soon as we know
// we can. This will prevent reentrancy issues if calling
// `destination.next()` happens to emit another value from source.
operatorSubscriber.unsubscribe();
destination.next(value);
destination.complete();
}
},
complete: () => {
if (!hasDefaultValue) {
destination.error(new ArgumentOutOfRangeError());
} else {
destination.next(defaultValue!);
destination.complete();
}
},
});
source.subscribe(operatorSubscriber);
});
}
38 changes: 28 additions & 10 deletions packages/rxjs/src/internal/operators/first.ts
@@ -1,11 +1,8 @@
import type { Observable } from '../Observable.js';
import { Observable, operate } from '../Observable.js';
import { EmptyError } from '../util/EmptyError.js';
import type { OperatorFunction, TruthyTypesOf } from '../types.js';
import { filter } from './filter.js';
import { take } from './take.js';
import { defaultIfEmpty } from './defaultIfEmpty.js';
import { throwIfEmpty } from './throwIfEmpty.js';
import { identity } from '../util/identity.js';

export function first<T, D = T>(predicate?: null, defaultValue?: D): OperatorFunction<T, T | D>;
export function first<T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>;
Expand Down Expand Up @@ -84,10 +81,31 @@ export function first<T, D>(
defaultValue?: D
): OperatorFunction<T, T | D> {
const hasDefaultValue = arguments.length >= 2;
return (source: Observable<T>) =>
source.pipe(
predicate ? filter((v, i) => predicate(v, i, source)) : identity,
take(1),
hasDefaultValue ? defaultIfEmpty(defaultValue!) : throwIfEmpty(() => new EmptyError())
);
return (source) =>
new Observable((destination) => {
let index = 0;
const operatorSubscriber = operate<T, T | D>({
destination,
next: (value) => {
const passed = predicate ? predicate(value, index++, source) : true;
if (passed) {
// We want to unsubscribe from the source as soon as we know
// we can. This will prevent reentrancy issues if calling
// `destination.next()` happens to emit another value from source.
operatorSubscriber.unsubscribe();
destination.next(value);
destination.complete();
}
},
complete: () => {
if (hasDefaultValue) {
destination.next(defaultValue!);
destination.complete();
} else {
destination.error(new EmptyError());
}
},
});
source.subscribe(operatorSubscriber);
});
}
2 changes: 1 addition & 1 deletion packages/rxjs/src/internal/operators/groupBy.ts
Expand Up @@ -169,7 +169,7 @@ export function groupBy<T, K, R>(
// out-of-band with our `subscriber` which is the downstream subscriber, or destination,
// in cases where a user unsubscribes from the main resulting subscription, but
// still has groups from this subscription subscribed and would expect values from it
// Consider: `source.pipe(groupBy(fn), take(2))`.
// Consider: `rx(source, groupBy(fn), take(2))`.
const groupBySourceSubscriber = operate({
destination,
next: (value: T) => {
Expand Down
39 changes: 28 additions & 11 deletions packages/rxjs/src/internal/operators/last.ts
@@ -1,11 +1,6 @@
import type { Observable } from '../Observable.js';
import { Observable, operate } from '../Observable.js';
import { EmptyError } from '../util/EmptyError.js';
import type { OperatorFunction, TruthyTypesOf } from '../types.js';
import { filter } from './filter.js';
import { takeLast } from './takeLast.js';
import { throwIfEmpty } from './throwIfEmpty.js';
import { defaultIfEmpty } from './defaultIfEmpty.js';
import { identity } from '../util/identity.js';

export function last<T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>;
export function last<T, D>(predicate: BooleanConstructor, defaultValue: D): OperatorFunction<T, TruthyTypesOf<T> | D>;
Expand Down Expand Up @@ -83,9 +78,31 @@ export function last<T, D>(
): OperatorFunction<T, T | D> {
const hasDefaultValue = arguments.length >= 2;
return (source: Observable<T>) =>
source.pipe(
predicate ? filter((v, i) => predicate(v, i, source)) : identity,
takeLast(1),
hasDefaultValue ? defaultIfEmpty(defaultValue!) : throwIfEmpty(() => new EmptyError())
);
new Observable((destination) => {
let index = 0;
let found = false;
let lastValue: T | D | undefined;
source.subscribe(
operate({
destination,
next(value) {
if (!predicate || predicate(value, index++, source)) {
found = true;
lastValue = value;
}
},
complete() {
if (found) {
destination.next(lastValue!);
destination.complete();
} else if (hasDefaultValue) {
destination.next(defaultValue!);
destination.complete();
} else {
destination.error(new EmptyError());
}
},
})
);
});
}