Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concatStatic): missing export creating breaking change #2999

Merged
merged 2 commits into from Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 100 additions & 2 deletions src/observable/merge.ts
@@ -1,3 +1,101 @@
import { mergeStatic } from '../operator/merge';
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { ArrayObservable } from './ArrayObservable';
import { isScheduler } from '../util/isScheduler';
import { mergeAll } from '../operators/mergeAll';

export const merge = mergeStatic;
/* tslint:disable:max-line-length */
export function merge<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function merge<T>(v1: ObservableInput<T>, concurrent?: number, scheduler?: IScheduler): Observable<T>;
export function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2>;
export function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function merge<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function merge<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function merge<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function merge<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function merge<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function merge<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function merge<T>(...observables: (ObservableInput<T> | IScheduler | number)[]): Observable<T>;
export function merge<T, R>(...observables: (ObservableInput<any> | IScheduler | number)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which concurrently emits all values from every
* given input Observable.
*
* <span class="informal">Flattens multiple Observables together by blending
* their values into one Observable.</span>
*
* <img src="./img/merge.png" width="100%">
*
* `merge` subscribes to each given input Observable (as arguments), and simply
* forwards (without doing any transformation) all the values from all the input
* Observables to the output Observable. The output Observable only completes
* once all input Observables have completed. Any error delivered by an input
* Observable will be immediately emitted on the output Observable.
*
* @example <caption>Merge together two Observables: 1s interval and clicks</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var timer = Rx.Observable.interval(1000);
* var clicksOrTimer = Rx.Observable.merge(clicks, timer);
* clicksOrTimer.subscribe(x => console.log(x));
*
* // Results in the following:
* // timer will emit ascending values, one every second(1000ms) to console
* // clicks logs MouseEvents to console everytime the "document" is clicked
* // Since the two streams are merged you see these happening
* // as they occur.
*
* @example <caption>Merge together 3 Observables, but only 2 run concurrently</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var concurrent = 2; // the argument
* var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
* merged.subscribe(x => console.log(x));
*
* // Results in the following:
* // - First timer1 and timer2 will run concurrently
* // - timer1 will emit a value every 1000ms for 10 iterations
* // - timer2 will emit a value every 2000ms for 6 iterations
* // - after timer1 hits it's max iteration, timer2 will
* // continue, and timer3 will start to run concurrently with timer2
* // - when timer2 hits it's max iteration it terminates, and
* // timer3 will continue to emit a value every 500ms until it is complete
*
* @see {@link mergeAll}
* @see {@link mergeMap}
* @see {@link mergeMapTo}
* @see {@link mergeScan}
*
* @param {...ObservableInput} observables Input Observables to merge together.
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
* Observables being subscribed to concurrently.
* @param {Scheduler} [scheduler=null] The IScheduler to use for managing
* concurrency of input Observables.
* @return {Observable} an Observable that emits items that are the result of
* every input Observable.
* @static true
* @name merge
* @owner Observable
*/
export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): Observable<R> {
let concurrent = Number.POSITIVE_INFINITY;
let scheduler: IScheduler = null;
let last: any = observables[observables.length - 1];
if (isScheduler(last)) {
scheduler = <IScheduler>observables.pop();
if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') {
concurrent = <number>observables.pop();
}
} else if (typeof last === 'number') {
concurrent = <number>observables.pop();
}

if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) {
return <Observable<R>>observables[0];
}

return mergeAll(concurrent)(new ArrayObservable(<any>observables, scheduler)) as Observable<R>;
}
2 changes: 2 additions & 0 deletions src/operator/concat.ts
Expand Up @@ -2,6 +2,8 @@ import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { concat as higherOrder } from '../operators/concat';

export { concat as concatStatic } from '../observable/concat';

/* tslint:disable:max-line-length */
export function concat<T>(this: Observable<T>, scheduler?: IScheduler): Observable<T>;
export function concat<T, T2>(this: Observable<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
Expand Down
2 changes: 1 addition & 1 deletion src/operator/merge.ts
Expand Up @@ -2,7 +2,7 @@ import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { merge as higherOrder } from '../operators/merge';

export { mergeStatic } from '../operators/merge';
export { merge as mergeStatic } from '../observable/merge';

/* tslint:disable:max-line-length */
export function merge<T>(this: Observable<T>, scheduler?: IScheduler): Observable<T>;
Expand Down
2 changes: 2 additions & 0 deletions src/operators/concat.ts
Expand Up @@ -3,6 +3,8 @@ import { IScheduler } from '../Scheduler';
import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces';
import { concat as concatStatic } from '../observable/concat';

export { concat as concatStatic } from '../observable/concat';

/* tslint:disable:max-line-length */
export function concat<T>(scheduler?: IScheduler): MonoTypeOperatorFunction<T>;
export function concat<T, T2>(v2: ObservableInput<T2>, scheduler?: IScheduler): OperatorFunction<T, T | T2>;
Expand Down
85 changes: 17 additions & 68 deletions src/operators/merge.ts
@@ -1,9 +1,9 @@
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { ArrayObservable } from '../observable/ArrayObservable';
import { mergeAll } from './mergeAll';
import { isScheduler } from '../util/isScheduler';
import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces';
import { merge as mergeStatic } from '../observable/merge';

export { merge as mergeStatic } from '../observable/merge';

/* tslint:disable:max-line-length */
export function merge<T>(scheduler?: IScheduler): MonoTypeOperatorFunction<T>;
Expand All @@ -21,27 +21,6 @@ export function merge<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: Observ
export function merge<T>(...observables: Array<ObservableInput<T> | IScheduler | number>): MonoTypeOperatorFunction<T>;
export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift.call(mergeStatic(source, ...observables));
}

/* tslint:disable:max-line-length */
export function mergeStatic<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function mergeStatic<T>(v1: ObservableInput<T>, concurrent?: number, scheduler?: IScheduler): Observable<T>;
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2>;
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function mergeStatic<T>(...observables: (ObservableInput<T> | IScheduler | number)[]): Observable<T>;
export function mergeStatic<T, R>(...observables: (ObservableInput<any> | IScheduler | number)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which concurrently emits all values from every
* given input Observable.
Expand All @@ -51,73 +30,43 @@ export function mergeStatic<T, R>(...observables: (ObservableInput<any> | ISched
*
* <img src="./img/merge.png" width="100%">
*
* `merge` subscribes to each given input Observable (as arguments), and simply
* forwards (without doing any transformation) all the values from all the input
* Observables to the output Observable. The output Observable only completes
* once all input Observables have completed. Any error delivered by an input
* Observable will be immediately emitted on the output Observable.
* `merge` subscribes to each given input Observable (either the source or an
* Observable given as argument), and simply forwards (without doing any
* transformation) all the values from all the input Observables to the output
* Observable. The output Observable only completes once all input Observables
* have completed. Any error delivered by an input Observable will be immediately
* emitted on the output Observable.
*
* @example <caption>Merge together two Observables: 1s interval and clicks</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var timer = Rx.Observable.interval(1000);
* var clicksOrTimer = Rx.Observable.merge(clicks, timer);
* var clicksOrTimer = clicks.merge(timer);
* clicksOrTimer.subscribe(x => console.log(x));
*
* // Results in the following:
* // timer will emit ascending values, one every second(1000ms) to console
* // clicks logs MouseEvents to console everytime the "document" is clicked
* // Since the two streams are merged you see these happening
* // as they occur.
*
* @example <caption>Merge together 3 Observables, but only 2 run concurrently</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var concurrent = 2; // the argument
* var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
* var merged = timer1.merge(timer2, timer3, concurrent);
* merged.subscribe(x => console.log(x));
*
* // Results in the following:
* // - First timer1 and timer2 will run concurrently
* // - timer1 will emit a value every 1000ms for 10 iterations
* // - timer2 will emit a value every 2000ms for 6 iterations
* // - after timer1 hits it's max iteration, timer2 will
* // continue, and timer3 will start to run concurrently with timer2
* // - when timer2 hits it's max iteration it terminates, and
* // timer3 will continue to emit a value every 500ms until it is complete
*
* @see {@link mergeAll}
* @see {@link mergeMap}
* @see {@link mergeMapTo}
* @see {@link mergeScan}
*
* @param {...ObservableInput} observables Input Observables to merge together.
* @param {ObservableInput} other An input Observable to merge with the source
* Observable. More than one input Observables may be given as argument.
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
* Observables being subscribed to concurrently.
* @param {Scheduler} [scheduler=null] The IScheduler to use for managing
* concurrency of input Observables.
* @return {Observable} an Observable that emits items that are the result of
* @return {Observable} An Observable that emits items that are the result of
* every input Observable.
* @static true
* @name merge
* @method merge
* @owner Observable
*/
export function mergeStatic<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): Observable<R> {
let concurrent = Number.POSITIVE_INFINITY;
let scheduler: IScheduler = null;
let last: any = observables[observables.length - 1];
if (isScheduler(last)) {
scheduler = <IScheduler>observables.pop();
if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') {
concurrent = <number>observables.pop();
}
} else if (typeof last === 'number') {
concurrent = <number>observables.pop();
}

if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) {
return <Observable<R>>observables[0];
}

return mergeAll(concurrent)(new ArrayObservable(<any>observables, scheduler)) as Observable<R>;
export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift.call(mergeStatic(source, ...observables));
}