Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove deprecations: from with scheduler, combineLatest with scheduler #7167

Merged
merged 9 commits into from Feb 17, 2023
4 changes: 0 additions & 4 deletions spec-dtslint/observables/from-spec.ts
Expand Up @@ -52,10 +52,6 @@ it('should accept an array of Observables', () => {
// const o = from([of(1), ['test'], iterable]); // $__TODO__ExpectType Observable<IterableIterator<number> | Observable<number> | string[]>
// });

it('should support scheduler', () => {
const a = from([1, 2, 3], animationFrameScheduler); // $ExpectType Observable<number>
});

it('should accept a ReadableStream', () => {
const stream = new ReadableStream<string>({
pull(controller) {
Expand Down
43 changes: 0 additions & 43 deletions spec/observables/combineLatest-spec.ts
Expand Up @@ -41,22 +41,6 @@ describe('static combineLatest', () => {

expect(results).to.deep.equal(['done']);
});

it('should return EMPTY if passed an empty array and scheduler as the only argument', () => {
const results: string[] = [];
combineLatest([], rxTestScheduler).subscribe({
next: () => {
throw new Error('should not emit')
},
complete: () => {
results.push('done');
}
});

expect(results).to.deep.equal([]);
rxTestScheduler.flush();
expect(results).to.deep.equal(['done']);
});

it('should combineLatest the provided observables', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
Expand All @@ -70,33 +54,6 @@ describe('static combineLatest', () => {
});
});

it('should combine an immediately-scheduled source with an immediately-scheduled second', (done) => {
const a = of(1, 2, 3, queueScheduler);
const b = of(4, 5, 6, 7, 8, queueScheduler);
const r = [
[1, 4],
[2, 4],
[2, 5],
[3, 5],
[3, 6],
[3, 7],
[3, 8],
];

const actual: [number, number][] = [];
//type definition need to be updated
combineLatest(a, b, queueScheduler).subscribe(
{ next: (vals) => {
actual.push(vals);
}, error: () => {
done(new Error('should not be called'));
}, complete: () => {
expect(actual).to.deep.equal(r);
done();
} }
);
});

it('should accept array of observables', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const firstSource = hot(' ----a----b----c----|');
Expand Down
47 changes: 1 addition & 46 deletions spec/observables/from-promise-spec.ts
@@ -1,8 +1,6 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { asapScheduler, from } from 'rxjs';

declare const process: any;
import { from } from 'rxjs';

/** @test {fromPromise} */
describe('from (fromPromise)', () => {
Expand Down Expand Up @@ -86,49 +84,6 @@ describe('from (fromPromise)', () => {
} });
});

it('should emit a value from a resolved promise on a separate scheduler', (done) => {
const promise = Promise.resolve(42);
from(promise, asapScheduler)
.subscribe(
{ next: (x) => { expect(x).to.equal(42); }, error: (x) => {
done(new Error('should not be called'));
}, complete: () => {
done();
} });
});

it('should raise error from a rejected promise on a separate scheduler', (done) => {
const promise = Promise.reject('bad');
from(promise, asapScheduler)
.subscribe(
{ next: (x) => { done(new Error('should not be called')); }, error: (e) => {
expect(e).to.equal('bad');
done();
}, complete: () => {
done(new Error('should not be called'));
} });
});

it('should share the underlying promise with multiple subscribers on a separate scheduler', (done) => {
const promise = Promise.resolve(42);
const observable = from(promise, asapScheduler);

observable
.subscribe(
{ next: (x) => { expect(x).to.equal(42); }, error: (x) => {
done(new Error('should not be called'));
} });
setTimeout(() => {
observable
.subscribe(
{ next: (x) => { expect(x).to.equal(42); }, error: (x) => {
done(new Error('should not be called'));
}, complete: () => {
done();
} });
});
});

it('should not emit, throw or complete if immediately unsubscribed', (done) => {
const nextSpy = sinon.spy();
const throwSpy = sinon.spy();
Expand Down
17 changes: 0 additions & 17 deletions spec/observables/from-spec.ts
Expand Up @@ -273,23 +273,6 @@ describe('from', () => {
},
});
});
it(`should accept ${source.name} and scheduler`, (done) => {
let nextInvoked = false;
from(source.createValue(), asyncScheduler).subscribe({
next: (x) => {
nextInvoked = true;
expect(x).to.equal('x');
},
error: (x) => {
done(new Error('should not be called'));
},
complete: () => {
expect(nextInvoked).to.equal(true);
done();
},
});
expect(nextInvoked).to.equal(false);
});

it(`should accept a function that implements [Symbol.observable]`, (done) => {
const subject = new Subject<any>();
Expand Down
148 changes: 47 additions & 101 deletions src/internal/observable/combineLatest.ts
@@ -1,16 +1,15 @@
import { Observable } from '../Observable';
import { ObservableInput, SchedulerLike, ObservedValueOf, ObservableInputTuple } from '../types';
import { ObservableInput, ObservedValueOf, ObservableInputTuple } from '../types';
import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
import { Subscriber } from '../Subscriber';
import { from } from './from';
import { identity } from '../util/identity';
import { Subscription } from '../Subscription';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
import { popResultSelector, popScheduler } from '../util/args';
import { popResultSelector } from '../util/args';
import { createObject } from '../util/createObject';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { AnyCatcher } from '../AnyCatcher';
import { executeSchedule } from '../util/executeSchedule';
import { EMPTY } from './empty';

// combineLatest(any)
// We put this first because we need to catch cases where the user has supplied
Expand All @@ -28,37 +27,18 @@ export function combineLatest<T extends AnyCatcher>(arg: T): Observable<unknown>
// combineLatest([a, b, c])
export function combineLatest(sources: []): Observable<never>;
export function combineLatest<A extends readonly unknown[]>(sources: readonly [...ObservableInputTuple<A>]): Observable<A>;
/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function combineLatest<A extends readonly unknown[], R>(
sources: readonly [...ObservableInputTuple<A>],
resultSelector: (...values: A) => R,
scheduler: SchedulerLike
): Observable<R>;
export function combineLatest<A extends readonly unknown[], R>(
sources: readonly [...ObservableInputTuple<A>],
resultSelector: (...values: A) => R
): Observable<R>;
/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function combineLatest<A extends readonly unknown[]>(
sources: readonly [...ObservableInputTuple<A>],
scheduler: SchedulerLike
): Observable<A>;

// combineLatest(a, b, c)
/** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
export function combineLatest<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A>;
/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function combineLatest<A extends readonly unknown[], R>(
...sourcesAndResultSelectorAndScheduler: [...ObservableInputTuple<A>, (...values: A) => R, SchedulerLike]
): Observable<R>;
/** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
export function combineLatest<A extends readonly unknown[], R>(
...sourcesAndResultSelector: [...ObservableInputTuple<A>, (...values: A) => R]
): Observable<R>;
/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function combineLatest<A extends readonly unknown[]>(
...sourcesAndScheduler: [...ObservableInputTuple<A>, SchedulerLike]
): Observable<A>;

// combineLatest({a, b, c})
export function combineLatest(sourcesObject: { [K in any]: never }): Observable<never>;
Expand Down Expand Up @@ -192,14 +172,11 @@ export function combineLatest<T extends Record<string, ObservableInput<any>>>(
* An array of Observables must be given as the first argument.
* @param {function} [project] An optional function to project the values from
* the combined latest values into a new value on the output Observable.
* @param {SchedulerLike} [scheduler=null] The {@link SchedulerLike} to use for subscribing to
* each input Observable.
* @return {Observable} An Observable of projected values from the most recent
* values from each input Observable, or an array of the most recent values from
* each input Observable.
*/
export function combineLatest<O extends ObservableInput<any>, R>(...args: any[]): Observable<R> | Observable<ObservedValueOf<O>[]> {
const scheduler = popScheduler(args);
const resultSelector = popResultSelector(args);

const { args: observables, keys } = argsArgArrayOrObject(args);
Expand All @@ -208,13 +185,12 @@ export function combineLatest<O extends ObservableInput<any>, R>(...args: any[])
// If no observables are passed, or someone has passed an empty array
// of observables, or even an empty object POJO, we need to just
// complete (EMPTY), but we have to honor the scheduler provided if any.
return from([], scheduler as any);
return EMPTY;
}

const result = new Observable<ObservedValueOf<O>[]>(
combineLatestInit(
observables as ObservableInput<ObservedValueOf<O>>[],
scheduler,
keys
? // A handler for scrubbing the array of args into a dictionary.
(values) => createObject(keys, values)
Expand All @@ -226,79 +202,49 @@ export function combineLatest<O extends ObservableInput<any>, R>(...args: any[])
return resultSelector ? (result.pipe(mapOneOrManyArgs(resultSelector)) as Observable<R>) : result;
}

export function combineLatestInit(
observables: ObservableInput<any>[],
scheduler?: SchedulerLike,
valueTransform: (values: any[]) => any = identity
) {
export function combineLatestInit(observables: ObservableInput<any>[], valueTransform: (values: any[]) => any = identity) {
return (subscriber: Subscriber<any>) => {
// The outer subscription. We're capturing this in a function
// because we may have to schedule it.
maybeSchedule(
scheduler,
() => {
const { length } = observables;
// A store for the values each observable has emitted so far. We match observable to value on index.
const values = new Array(length);
// The number of currently active subscriptions, as they complete, we decrement this number to see if
// we are all done combining values, so we can complete the result.
let active = length;
// The number of inner sources that still haven't emitted the first value
// We need to track this because all sources need to emit one value in order
// to start emitting values.
let remainingFirstValues = length;
// The loop to kick off subscription. We're keying everything on index `i` to relate the observables passed
// in to the slot in the output array or the key in the array of keys in the output dictionary.
for (let i = 0; i < length; i++) {
maybeSchedule(
scheduler,
() => {
const source = from(observables[i], scheduler as any);
let hasFirstValue = false;
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
// When we get a value, record it in our set of values.
values[i] = value;
if (!hasFirstValue) {
// If this is our first value, record that.
hasFirstValue = true;
remainingFirstValues--;
}
if (!remainingFirstValues) {
// We're not waiting for any more
// first values, so we can emit!
subscriber.next(valueTransform(values.slice()));
}
},
() => {
if (!--active) {
// We only complete the result if we have no more active
// inner observables.
subscriber.complete();
}
}
)
);
},
subscriber
);
}
},
subscriber
);
const { length } = observables;
// A store for the values each observable has emitted so far. We match observable to value on index.
const values = new Array(length);
// The number of currently active subscriptions, as they complete, we decrement this number to see if
// we are all done combining values, so we can complete the result.
let active = length;
// The number of inner sources that still haven't emitted the first value
// We need to track this because all sources need to emit one value in order
// to start emitting values.
let remainingFirstValues = length;
// The loop to kick off subscription. We're keying everything on index `i` to relate the observables passed
// in to the slot in the output array or the key in the array of keys in the output dictionary.
for (let i = 0; i < length; i++) {
const source = from(observables[i]);
let hasFirstValue = false;
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
// When we get a value, record it in our set of values.
values[i] = value;
if (!hasFirstValue) {
// If this is our first value, record that.
hasFirstValue = true;
remainingFirstValues--;
}
if (!remainingFirstValues) {
// We're not waiting for any more
// first values, so we can emit!
subscriber.next(valueTransform(values.slice()));
}
},
() => {
if (!--active) {
// We only complete the result if we have no more active
// inner observables.
subscriber.complete();
}
}
)
);
}
};
}

/**
* A small utility to handle the couple of locations where we want to schedule if a scheduler was provided,
* but we don't if there was no scheduler.
*/
function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
if (scheduler) {
executeSchedule(subscription, scheduler, execute);
} else {
execute();
}
}
4 changes: 3 additions & 1 deletion src/internal/observable/concat.ts
Expand Up @@ -2,6 +2,7 @@ import { Observable } from '../Observable';
import { ObservableInputTuple, SchedulerLike } from '../types';
import { concatAll } from '../operators/concatAll';
import { popScheduler } from '../util/args';
import { scheduled } from '../scheduled/scheduled';
import { from } from './from';

export function concat<T extends readonly unknown[]>(...inputs: [...ObservableInputTuple<T>]): Observable<T[number]>;
Expand Down Expand Up @@ -111,5 +112,6 @@ export function concat<T extends readonly unknown[]>(
* @param args Input Observables to concatenate.
*/
export function concat(...args: any[]): Observable<unknown> {
return concatAll()(from(args, popScheduler(args)));
const scheduler = popScheduler(args);
return concatAll()(scheduler ? scheduled(args, scheduler) : from(args));
}