Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rxjs): expand no longer supports a scheduler parameter #7431

Merged
merged 1 commit into from Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 1 addition & 40 deletions packages/rxjs/spec/operators/expand-spec.ts
Expand Up @@ -30,23 +30,6 @@ describe('expand', () => {
});
});

it('should work with scheduler', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --x----| ', { x: 1 });
const e1subs = ' ^------! ';
const e2 = cold(' --c| ', { c: 2 });
// --c|
// --c|
const expected = '--a-b-c-d|';
const values = { a: 1, b: 2, c: 4, d: 8 };

const result = e1.pipe(expand((x) => (x === 8 ? EMPTY : e2.pipe(map((c) => c * x))), Infinity, testScheduler));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should map and recursively flatten', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const values = {
Expand Down Expand Up @@ -470,35 +453,13 @@ describe('expand', () => {
return cold(e2shape, { z: x + x });
};

const result = e1.pipe(expand(project, undefined, undefined));
const result = e1.pipe(expand(project, undefined));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should work with the AsapScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0)
.pipe(
expand((x) => of(x + 1), Infinity, asapScheduler),
take(10),
toArray()
)
.subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done });
});

it('should work with the AsyncScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0)
.pipe(
expand((x) => of(x + 1), Infinity, asyncScheduler),
take(10),
toArray()
)
.subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done });
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
Expand Down
21 changes: 4 additions & 17 deletions packages/rxjs/src/internal/operators/expand.ts
@@ -1,21 +1,10 @@
import type { OperatorFunction, ObservableInput, ObservedValueOf, SchedulerLike } from '../types.js';
import type { OperatorFunction, ObservableInput, ObservedValueOf } from '../types.js';
import { Observable } from '../Observable.js';
import { mergeInternals } from './mergeInternals.js';

export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent?: number,
scheduler?: SchedulerLike
): OperatorFunction<T, ObservedValueOf<O>>;
/**
* @deprecated The `scheduler` parameter will be removed in v8. If you need to schedule the inner subscription,
* use `subscribeOn` within the projection function: `expand((value) => fn(value).pipe(subscribeOn(scheduler)))`.
* Details: Details: https://rxjs.dev/deprecations/scheduler-argument
*/
export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent: number | undefined,
scheduler: SchedulerLike
concurrent?: number
): OperatorFunction<T, ObservedValueOf<O>>;

/**
Expand Down Expand Up @@ -70,8 +59,7 @@ export function expand<T, O extends ObservableInput<unknown>>(
*/
export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent = Infinity,
scheduler?: SchedulerLike
concurrent = Infinity
): OperatorFunction<T, ObservedValueOf<O>> {
concurrent = (concurrent || 0) < 1 ? Infinity : concurrent;
return (source) =>
Expand All @@ -89,8 +77,7 @@ export function expand<T, O extends ObservableInput<unknown>>(
undefined,

// Expand-specific
true, // Use expand path
scheduler // Inner subscription scheduler
true // Use expand path
)
);
}
25 changes: 4 additions & 21 deletions packages/rxjs/src/internal/operators/mergeInternals.ts
@@ -1,7 +1,6 @@
import type { Observable, Subscriber} from '../Observable.js';
import type { Observable, Subscriber } from '../Observable.js';
import { from, operate } from '../Observable.js';
import type { ObservableInput, SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';
import type { ObservableInput } from '../types.js';

/**
* A process embodying the general "merge" strategy. This is used in
Expand All @@ -22,9 +21,7 @@ export function mergeInternals<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike,
additionalFinalizer?: () => void
expand?: boolean
) {
// Buffered values, in the event of going over our concurrency limit
const buffer: T[] = [];
Expand Down Expand Up @@ -107,15 +104,7 @@ export function mergeInternals<T, R>(
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(destination, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
Expand All @@ -140,10 +129,4 @@ export function mergeInternals<T, R>(
},
})
);

// Additional finalization (for when the destination is torn down).
// Other finalization is added implicitly via subscription above.
return () => {
additionalFinalizer?.();
};
}
10 changes: 6 additions & 4 deletions packages/rxjs/src/internal/operators/mergeScan.ts
Expand Up @@ -77,17 +77,19 @@ export function mergeScan<T, R>(
// The accumulated state.
let state = seed;

return mergeInternals(
mergeInternals(
source,
subscriber,
(value, index) => accumulator(state, value, index),
concurrent,
(value) => {
state = value;
},
false,
undefined,
() => (state = null!)
false
);

return () => {
state = null!;
};
});
}