Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/internal/ReplaySubject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Subject } from './Subject';
import { IScheduler } from './Scheduler';
import { SchedulerLike } from './types';
import { queue } from './scheduler/queue';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
Expand All @@ -16,7 +16,7 @@ export class ReplaySubject<T> extends Subject<T> {

constructor(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
private scheduler?: IScheduler) {
private scheduler?: SchedulerLike) {
super();
this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
this._windowTime = windowTime < 1 ? 1 : windowTime;
Expand Down
9 changes: 3 additions & 6 deletions src/internal/Scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { Action } from './scheduler/Action';
import { Subscription } from './Subscription';
import { SchedulerLike, SchedulerAction } from './types';

export interface IScheduler {
now(): number;
schedule<T>(work: (this: Action<T>, state?: T) => void, delay?: number, state?: T): Subscription;
}
/**
* An execution context and a data structure to order tasks and schedule their
* execution. Provides a notion of (potentially virtual) time, through the
Expand All @@ -21,7 +18,7 @@ export interface IScheduler {
*
* @class Scheduler
*/
export class Scheduler implements IScheduler {
export class Scheduler implements SchedulerLike {

public static now: () => number = Date.now ? Date.now : () => +new Date();

Expand Down Expand Up @@ -57,7 +54,7 @@ export class Scheduler implements IScheduler {
* @return {Subscription} A subscription in order to be able to unsubscribe
* the scheduled work.
*/
public schedule<T>(work: (this: Action<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
return new this.SchedulerAction<T>(this, work).schedule(state, delay);
}
}
9 changes: 4 additions & 5 deletions src/internal/observable/SubscribeOnObservable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Action } from '../scheduler/Action';
import { IScheduler } from '../Scheduler';
import { SchedulerLike, SchedulerAction } from '../types';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
Expand All @@ -17,18 +16,18 @@ export interface DispatchArg<T> {
* @hide true
*/
export class SubscribeOnObservable<T> extends Observable<T> {
static create<T>(source: Observable<T>, delay: number = 0, scheduler: IScheduler = asap): Observable<T> {
static create<T>(source: Observable<T>, delay: number = 0, scheduler: SchedulerLike = asap): Observable<T> {
return new SubscribeOnObservable(source, delay, scheduler);
}

static dispatch<T>(this: Action<T>, arg: DispatchArg<T>): Subscription {
static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription {
const { source, subscriber } = arg;
return this.add(source.subscribe(subscriber));
}

constructor(public source: Observable<T>,
private delayTime: number = 0,
private scheduler: IScheduler = asap) {
private scheduler: SchedulerLike = asap) {
super();
if (!isNumeric(delayTime) || delayTime < 0) {
this.delayTime = 0;
Expand Down
31 changes: 15 additions & 16 deletions src/internal/observable/bindCallback.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import { IScheduler } from '../Scheduler';
import { SchedulerLike, SchedulerAction } from '../types';
import { Observable } from '../Observable';
import { AsyncSubject } from '../AsyncSubject';
import { Subscriber } from '../Subscriber';
import { Action } from '../scheduler/Action';

// tslint:disable:max-line-length
export function bindCallback(callbackFunc: (callback: () => any) => any, scheduler?: IScheduler): () => Observable<void>;
export function bindCallback<R>(callbackFunc: (callback: (result: R) => any) => any, scheduler?: IScheduler): () => Observable<R>;
export function bindCallback<T, R>(callbackFunc: (v1: T, callback: (result: R) => any) => any, scheduler?: IScheduler): (v1: T) => Observable<R>;
export function bindCallback<T, T2, R>(callbackFunc: (v1: T, v2: T2, callback: (result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2) => Observable<R>;
export function bindCallback<T, T2, T3, R>(callbackFunc: (v1: T, v2: T2, v3: T3, callback: (result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3) => Observable<R>;
export function bindCallback<T, T2, T3, T4, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4) => Observable<R>;
export function bindCallback<T, T2, T3, T4, T5, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, callback: (result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable<R>;
export function bindCallback<T, T2, T3, T4, T5, T6, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, callback: (result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable<R>;
export function bindCallback<T>(callbackFunc: Function, scheduler?: IScheduler): (...args: any[]) => Observable<T>;
export function bindCallback(callbackFunc: (callback: () => any) => any, scheduler?: SchedulerLike): () => Observable<void>;
export function bindCallback<R>(callbackFunc: (callback: (result: R) => any) => any, scheduler?: SchedulerLike): () => Observable<R>;
export function bindCallback<T, R>(callbackFunc: (v1: T, callback: (result: R) => any) => any, scheduler?: SchedulerLike): (v1: T) => Observable<R>;
export function bindCallback<T, T2, R>(callbackFunc: (v1: T, v2: T2, callback: (result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2) => Observable<R>;
export function bindCallback<T, T2, T3, R>(callbackFunc: (v1: T, v2: T2, v3: T3, callback: (result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3) => Observable<R>;
export function bindCallback<T, T2, T3, T4, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3, v4: T4) => Observable<R>;
export function bindCallback<T, T2, T3, T4, T5, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, callback: (result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable<R>;
export function bindCallback<T, T2, T3, T4, T5, T6, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, callback: (result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable<R>;
export function bindCallback<T>(callbackFunc: Function, scheduler?: SchedulerLike): (...args: any[]) => Observable<T>;
// tslint:enable:max-line-length

/**
Expand Down Expand Up @@ -127,7 +126,7 @@ export function bindCallback<T>(callbackFunc: Function, scheduler?: IScheduler):
* @name bindCallback
*/
export function bindCallback<T>(callbackFunc: Function,
scheduler?: IScheduler): (...args: any[]) => Observable<T> {
scheduler?: SchedulerLike): (...args: any[]) => Observable<T> {
return function (this: any, ...args: any[]): Observable<T> {
const context = this;
let subject: AsyncSubject<T>;
Expand Down Expand Up @@ -171,12 +170,12 @@ interface DispatchState<T> {

interface ParamsContext<T> {
callbackFunc: Function;
scheduler: IScheduler;
scheduler: SchedulerLike;
context: any;
subject: AsyncSubject<T>;
}

function dispatch<T>(this: Action<DispatchState<T>>, state: DispatchState<T>) {
function dispatch<T>(this: SchedulerAction<DispatchState<T>>, state: DispatchState<T>) {
const self = this;
const { args, subscriber, params } = state;
const { callbackFunc, context, scheduler } = params;
Expand Down Expand Up @@ -204,7 +203,7 @@ interface NextState<T> {
value: T;
}

function dispatchNext<T>(this: Action<NextState<T>>, state: NextState<T>) {
function dispatchNext<T>(this: SchedulerAction<NextState<T>>, state: NextState<T>) {
const { value, subject } = state;
subject.next(value);
subject.complete();
Expand All @@ -215,7 +214,7 @@ interface ErrorState<T> {
err: any;
}

function dispatchError<T>(this: Action<ErrorState<T>>, state: ErrorState<T>) {
function dispatchError<T>(this: SchedulerAction<ErrorState<T>>, state: ErrorState<T>) {
const { err, subject } = state;
subject.error(err);
}
25 changes: 12 additions & 13 deletions src/internal/observable/bindNodeCallback.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import { Observable } from '../Observable';
import { IScheduler } from '../Scheduler';
import { AsyncSubject } from '../AsyncSubject';
import { Subscriber } from '../Subscriber';
import { Action } from '../scheduler/Action';
import { SchedulerAction, SchedulerLike } from '../types';

/* tslint:disable:max-line-length */
export function bindNodeCallback<R>(callbackFunc: (callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): () => Observable<R>;
export function bindNodeCallback<T, R>(callbackFunc: (v1: T, callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): (v1: T) => Observable<R>;
export function bindNodeCallback<T, T2, R>(callbackFunc: (v1: T, v2: T2, callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2) => Observable<R>;
export function bindNodeCallback<T, T2, T3, R>(callbackFunc: (v1: T, v2: T2, v3: T3, callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3) => Observable<R>;
export function bindNodeCallback<T, T2, T3, T4, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4) => Observable<R>;
export function bindNodeCallback<T, T2, T3, T4, T5, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable<R>;
export function bindNodeCallback<T, T2, T3, T4, T5, T6, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, callback: (err: any, result: R) => any) => any, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable<R>;
export function bindNodeCallback<T>(callbackFunc: Function, scheduler?: IScheduler): (...args: any[]) => Observable<T>;
export function bindNodeCallback<R>(callbackFunc: (callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): () => Observable<R>;
export function bindNodeCallback<T, R>(callbackFunc: (v1: T, callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): (v1: T) => Observable<R>;
export function bindNodeCallback<T, T2, R>(callbackFunc: (v1: T, v2: T2, callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2) => Observable<R>;
export function bindNodeCallback<T, T2, T3, R>(callbackFunc: (v1: T, v2: T2, v3: T3, callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3) => Observable<R>;
export function bindNodeCallback<T, T2, T3, T4, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3, v4: T4) => Observable<R>;
export function bindNodeCallback<T, T2, T3, T4, T5, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable<R>;
export function bindNodeCallback<T, T2, T3, T4, T5, T6, R>(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, callback: (err: any, result: R) => any) => any, scheduler?: SchedulerLike): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable<R>;
export function bindNodeCallback<T>(callbackFunc: Function, scheduler?: SchedulerLike): (...args: any[]) => Observable<T>;
/* tslint:enable:max-line-length */

/**
Expand Down Expand Up @@ -118,7 +117,7 @@ export function bindNodeCallback<T>(callbackFunc: Function, scheduler?: ISchedul
* @name bindNodeCallback
*/
export function bindNodeCallback<T>(callbackFunc: Function,
scheduler?: IScheduler): (...args: any[]) => Observable<T> {
scheduler?: SchedulerLike): (...args: any[]) => Observable<T> {
return function(this: any, ...args: any[]): Observable<T> {
const params: ParamsState<T> = {
subject: undefined,
Expand Down Expand Up @@ -168,12 +167,12 @@ interface DispatchState<T> {
interface ParamsState<T> {
callbackFunc: Function;
args: any[];
scheduler: IScheduler;
scheduler: SchedulerLike;
subject: AsyncSubject<T>;
context: any;
}

function dispatch<T>(this: Action<DispatchState<T>>, state: DispatchState<T>) {
function dispatch<T>(this: SchedulerAction<DispatchState<T>>, state: DispatchState<T>) {
const { params, subscriber, context } = state;
const { callbackFunc, args, scheduler } = params;
let subject = params.subject;
Expand Down
Loading