Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/internal/observable/BoundCallbackObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export class BoundCallbackObservable<T> extends Observable<T> {
static create<T>(func: Function,
selector: Function | void = undefined,
scheduler?: IScheduler): (...args: any[]) => Observable<T> {
return function(this: any, ...args: any[]): Observable<T> {
return function (this: any, ...args: any[]): Observable<T> {
return new BoundCallbackObservable<T>(func, <any>selector, args, this, scheduler);
};
}
Expand Down Expand Up @@ -201,7 +201,7 @@ export class BoundCallbackObservable<T> extends Observable<T> {
const result = tryCatch(selector).apply(this, innerArgs);
if (result === errorObject) {
subject.error(errorObject.e);
} else {
} else {
subject.next(result);
subject.complete();
}
Expand All @@ -220,7 +220,8 @@ export class BoundCallbackObservable<T> extends Observable<T> {
}
return subject.subscribe(subscriber);
} else {
return scheduler.schedule(BoundCallbackObservable.dispatch, 0, { source: this, subscriber, context: this.context });
return scheduler.schedule<{ source: BoundCallbackObservable<T>, subscriber: Subscriber<T>, context: any }>
(BoundCallbackObservable.dispatch, 0, { source: this, subscriber, context: this.context });
}
}

Expand All @@ -239,13 +240,13 @@ export class BoundCallbackObservable<T> extends Observable<T> {
if (selector) {
const result = tryCatch(selector).apply(this, innerArgs);
if (result === errorObject) {
self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject }));
self.add(scheduler.schedule<DispatchErrorArg<T>>(dispatchError, 0, { err: errorObject.e, subject }));
} else {
self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject }));
self.add(scheduler.schedule<DispatchNextArg<T>>(dispatchNext, 0, { value: result, subject }));
}
} else {
const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
self.add(scheduler.schedule(dispatchNext, 0, { value, subject }));
self.add(scheduler.schedule<DispatchNextArg<T>>(dispatchNext, 0, { value, subject }));
}
};
// use named function to pass values in without closure
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/BoundNodeCallbackObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ export class BoundNodeCallbackObservable<T> extends Observable<T> {
}
return subject.subscribe(subscriber);
} else {
return scheduler.schedule(dispatch, 0, { source: this, subscriber, context: this.context });
return scheduler.schedule<DispatchState<T>>(dispatch, 0, { source: this, subscriber, context: this.context });
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/SubscribeOnObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ export class SubscribeOnObservable<T> extends Observable<T> {
const source = this.source;
const scheduler = this.scheduler;

return scheduler.schedule(SubscribeOnObservable.dispatch, delay, {
return scheduler.schedule<DispatchArg<any>>(SubscribeOnObservable.dispatch, delay, {
source, subscriber
});
}
}
}
2 changes: 1 addition & 1 deletion src/internal/observable/dom/AjaxObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ export class AjaxSubscriber<T> extends Subscriber<Event> {
(<any>xhrError).progressSubscriber = progressSubscriber;
}

function xhrReadyStateChange(this: XMLHttpRequest, e: ProgressEvent) {
function xhrReadyStateChange(this: XMLHttpRequest, e: Event) {
const { subscriber, progressSubscriber, request } = (<any>xhrReadyStateChange);
if (this.readyState === 4) {
// normalize IE9 bug (http://bugs.jquery.com/ticket/1450)
Expand Down
4 changes: 3 additions & 1 deletion src/internal/observable/pairs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ export function pairs<T>(obj: Object, scheduler?: IScheduler): Observable<[strin
return new Observable<[string, T]>(subscriber => {
const keys = Object.keys(obj);
const subscription = new Subscription();
subscription.add(scheduler.schedule(dispatch, 0, { keys, index: 0, subscriber, subscription, obj }));
subscription.add(
scheduler.schedule<{ keys: string[], index: number, subscriber: Subscriber<[string, T]>, subscription: Subscription, obj: Object }>
(dispatch, 0, { keys, index: 0, subscriber, subscription, obj }));
return subscription;
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
} else {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable, i);
let subscription = subscribeToResult(this, observable, observable as any, i);

if (this.subscriptions) {
this.subscriptions.push(subscription);
Expand Down
28 changes: 14 additions & 14 deletions src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,18 @@ class Context<T> {
closeAction: Subscription;
}

type CreationState<T> = {
interface DispatchCreateArg<T> {
bufferTimeSpan: number;
bufferCreationInterval: number,
bufferCreationInterval: number;
subscriber: BufferTimeSubscriber<T>;
scheduler: IScheduler;
};

interface DispatchCloseArg<T> {
subscriber: BufferTimeSubscriber<T>;
context: Context<T>;
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
Expand All @@ -129,9 +134,9 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
} else {
const closeState = { subscriber: this, context };
const creationState: CreationState<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
const creationState: DispatchCreateArg<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
this.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, closeState));
this.add(scheduler.schedule<DispatchCreateArg<T>>(dispatchBufferCreation, bufferCreationInterval, creationState));
}
}

Expand Down Expand Up @@ -216,22 +221,17 @@ function dispatchBufferTimeSpanOnly(this: Action<any>, state: any) {
}
}

interface DispatchArg<T> {
subscriber: BufferTimeSubscriber<T>;
context: Context<T>;
}

function dispatchBufferCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>) {
function dispatchBufferCreation<T>(this: Action<DispatchCreateArg<T>>, state: DispatchCreateArg<T>) {
const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
const context = subscriber.openContext();
const action = <Action<CreationState<T>>>this;
const action = <Action<DispatchCreateArg<T>>>this;
if (!subscriber.closed) {
subscriber.add(context.closeAction = scheduler.schedule<DispatchArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
subscriber.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
action.schedule(state, bufferCreationInterval);
}
}

function dispatchBufferClose<T>(arg: DispatchArg<T>) {
function dispatchBufferClose<T>(arg: DispatchCloseArg<T>) {
const { subscriber, context } = arg;
subscriber.closeContext(context);
}
2 changes: 1 addition & 1 deletion src/internal/operators/concatMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export function concatMapTo<T, I, R>(observable: ObservableInput<I>, resultSelec
* @owner Observable
*/
export function concatMapTo<T, I, R>(
innerObservable: Observable<I>,
innerObservable: ObservableInput<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R
): OperatorFunction<T, R> {
return concatMap(() => innerObservable, resultSelector);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
this.subscribeToProjection(result, value, index);
} else {
const state: DispatchArg<T, R> = { subscriber: this, result, value, index };
this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
this.add(this.scheduler.schedule<DispatchArg<T, R>>(ExpandSubscriber.dispatch, 0, state));
}
} else {
this.buffer.push(value);
Expand Down
10 changes: 5 additions & 5 deletions src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
* @extends {Ignored}
*/
class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription {
private groups: Map<K, Subject<T|R>> = null;
private groups: Map<K, Subject<T | R>> = null;
public attemptedToUnsubscribe: boolean = false;
public count: number = 0;

Expand Down Expand Up @@ -147,7 +147,7 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
let groups = this.groups;

if (!groups) {
groups = this.groups = new Map<K, Subject<T|R>>();
groups = this.groups = new Map<K, Subject<T | R>>();
}

let group = groups.get(key);
Expand All @@ -164,7 +164,7 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
}

if (!group) {
group = this.subjectSelector ? this.subjectSelector() : new Subject<R>();
group = (this.subjectSelector ? this.subjectSelector() : new Subject<R>()) as Subject<T | R>;
groups.set(key, group);
const groupedObservable = new GroupedObservable(key, group, this);
this.destination.next(groupedObservable);
Expand Down Expand Up @@ -231,7 +231,7 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
class GroupDurationSubscriber<K, T> extends Subscriber<T> {
constructor(private key: K,
private group: Subject<T>,
private parent: GroupBySubscriber<any, K, T>) {
private parent: GroupBySubscriber<any, K, T | any>) {
super(group);
}

Expand Down Expand Up @@ -265,7 +265,7 @@ export class GroupedObservable<K, T> extends Observable<T> {

protected _subscribe(subscriber: Subscriber<T>) {
const subscription = new Subscription();
const {refCountSubscription, groupSubject} = this;
const { refCountSubscription, groupSubject } = this;
if (refCountSubscription && !refCountSubscription.closed) {
subscription.add(new InnerRefCountSubscription(refCountSubscription));
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/mergeMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function mergeMapTo<T, I, R>(observable: ObservableInput<I>, resultSelect
* @method mergeMapTo
* @owner Observable
*/
export function mergeMapTo<T, I, R>(innerObservable: Observable<I>,
export function mergeMapTo<T, I, R>(innerObservable: ObservableInput<I>,
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number,
concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> {
if (typeof resultSelector === 'number') {
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/refCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Observable } from '../Observable';
export function refCount<T>(): MonoTypeOperatorFunction<T> {
return function refCountOperatorFunction(source: ConnectableObservable<T>): Observable<T> {
return source.lift(new RefCountOperator(source));
};
} as MonoTypeOperatorFunction<T>;
}

class RefCountOperator<T> implements Operator<T, T> {
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/startWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function startWith<T>(...array: Array<T | IScheduler>): MonoTypeOperatorF
} else if (len > 0) {
return concatStatic(fromArray(array as T[], scheduler), source);
} else {
return concatStatic<T>(empty(scheduler), source);
return concatStatic<T>(empty(scheduler) as any, source);
}
};
}
2 changes: 1 addition & 1 deletion src/internal/operators/subscribeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SubscribeOnOperator<T> implements Operator<T, T> {
private delay: number) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return new SubscribeOnObservable(
return new SubscribeOnObservable<T>(
source, this.delay, this.scheduler
).subscribe(subscriber);
}
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/switchMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export function switchMapTo<T, I, R>(observable: ObservableInput<I>, resultSelec
* @method switchMapTo
* @owner Observable
*/
export function switchMapTo<T, I, R>(innerObservable: Observable<I>,
export function switchMapTo<T, I, R>(innerObservable: ObservableInput<I>,
resultSelector?: (outerValue: T,
innerValue: I,
outerIndex: number,
Expand All @@ -63,7 +63,7 @@ export function switchMapTo<T, I, R>(innerObservable: Observable<I>,
}

class SwitchMapToOperator<T, I, R> implements Operator<T, I> {
constructor(private observable: Observable<I>,
constructor(private observable: ObservableInput<I>,
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
}

Expand All @@ -82,7 +82,7 @@ class SwitchMapToSubscriber<T, I, R> extends OuterSubscriber<T, I> {
private innerSubscription: Subscription;

constructor(destination: Subscriber<I>,
private inner: Observable<I>,
private inner: ObservableInput<I>,
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
super(destination);
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ThrottleTimeSubscriber<T> extends Subscriber<T> {
this._hasTrailingValue = true;
}
} else {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }));
this.add(this.throttled = this.scheduler.schedule<DispatchArg<T>>(dispatchNext, this.duration, { subscriber: this }));
if (this.leading) {
this.destination.next(value);
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule<TimeoutSubscriber<T>>(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/timeoutWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule<TimeoutWithSubscriber<T, R>>(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
)));
}
Expand Down
8 changes: 4 additions & 4 deletions src/internal/operators/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
const closeState: CloseState<T> = { subscriber: this, window, context: <any>null };
const creationState: CreationState<T> = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
this.add(scheduler.schedule<CloseState<T>>(dispatchWindowClose, windowTimeSpan, closeState));
this.add(scheduler.schedule<CreationState<T>>(dispatchWindowCreation, windowCreationInterval, creationState));
} else {
const timeSpanOnlyState: TimeSpanOnlyState<T> = { subscriber: this, window, windowTimeSpan };
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
this.add(scheduler.schedule<TimeSpanOnlyState<T>>(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
}
}

Expand Down Expand Up @@ -248,7 +248,7 @@ function dispatchWindowCreation<T>(this: Action<CreationState<T>>, state: Creati
const action = this;
let context: CloseWindowContext<T> = { action, subscription: <any>null };
const timeSpanState: CloseState<T> = { subscriber, window, context };
context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
context.subscription = scheduler.schedule<CloseState<T>>(dispatchWindowClose, windowTimeSpan, timeSpanState);
action.add(context.subscription);
action.schedule(state, windowCreationInterval);
}
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/windowToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
private openings: Observable<O>,
private closingSelector: (openValue: O) => Observable<any>) {
super(destination);
this.add(this.openSubscription = subscribeToResult(this, openings, openings));
this.add(this.openSubscription = subscribeToResult(this, openings, openings as any));
}

protected _next(value: T) {
Expand Down Expand Up @@ -164,7 +164,7 @@ class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
const subscription = new Subscription();
const context = { window, subscription };
this.contexts.push(context);
const innerSubscription = subscribeToResult(this, closingNotifier, context);
const innerSubscription = subscribeToResult(this, closingNotifier, context as any);

if (innerSubscription.closed) {
this.closeWindow(this.contexts.length - 1);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/patching/operator/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ export function combineLatest<T, R>(this: Observable<T>, ...observables: Array<O
observables = (<any>observables[0]).slice();
}

return this.lift.call(of(this, ...observables), new CombineLatestOperator(project));
return this.lift.call(of(this, ...observables as Array<Observable<any>>), new CombineLatestOperator(project));
}
2 changes: 1 addition & 1 deletion src/internal/patching/operator/concatMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export function concatMapTo<T, I, R>(this: Observable<T>, observable: Observable
* @method concatMapTo
* @owner Observable
*/
export function concatMapTo<T, I, R>(this: Observable<T>, innerObservable: Observable<I>,
export function concatMapTo<T, I, R>(this: Observable<T>, innerObservable: ObservableInput<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable<R> {
return higherOrder(innerObservable, resultSelector)(this);
}
2 changes: 1 addition & 1 deletion src/internal/patching/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export function mergeMapTo<T, I, R>(this: Observable<T>, observable: ObservableI
* @method mergeMapTo
* @owner Observable
*/
export function mergeMapTo<T, I, R>(this: Observable<T>, innerObservable: Observable<I>,
export function mergeMapTo<T, I, R>(this: Observable<T>, innerObservable: ObservableInput<I>,
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number,
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
return higherOrder(innerObservable, resultSelector as any, concurrent)(this) as Observable<R>;
Expand Down
2 changes: 1 addition & 1 deletion src/internal/patching/operator/publishLast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import { publishLast as higherOrder } from '../../operators/publishLast';
*/
export function publishLast<T>(this: Observable<T>): ConnectableObservable<T> {
//TODO(benlesh): correct type-flow through here.
return higherOrder()(this) as ConnectableObservable<T>;
return higherOrder<T>()(this) as ConnectableObservable<T>;
}
6 changes: 4 additions & 2 deletions src/internal/patching/operator/scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ export function scan<T, R>(this: Observable<T>, accumulator: (acc: R, value: T,
* @method scan
* @owner Observable
*/
export function scan<T, R>(this: Observable<T>, accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): Observable<R> {
export function scan<T, R>(this: Observable<T>,
accumulator: (acc: T | Array<T> | R, value: T, index: number) => T | Array<T> | R,
seed?: T | R): Observable<R> {
if (arguments.length >= 2) {
return higherOrderScan(accumulator, seed)(this) as Observable<R>;
}
return higherOrderScan(accumulator)(this);
return higherOrderScan(accumulator)(this) as Observable<R>;
}
Loading