Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ jobs:
- run: npm install
- run: npm run coverage
- name: Publish Coveralls
if: ${{ github.event_name == 'pull_request' }}
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.github_token }}
- name: Publish Codecov
if: ${{ github.event_name == 'pull_request' }}
uses: codecov/codecov-action@v1
with:
file: ./coverage/lcov.info
33 changes: 32 additions & 1 deletion src/scheduler/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ describe('Scheduler', () => {
expect((await r1).data).toBe(sampleText());
}));

test('Requests are resolved in Time order', run(async scheduler => {
const results = await Promise.all([
scheduler.scheduleRequest(createRequestEcho('One')),
scheduler.scheduleRequest(createRequestEcho('Two')),
scheduler.scheduleRequest(createRequestSleep(50), 2).catch(e => e),
scheduler.scheduleRequest(createRequestEcho('Three')),
scheduler.scheduleRequest(createRequestEcho('Four')),
]);
const timestamps = results.map(v => v.timestamp);
timestamps.reduce((a, n) => {
expect(n).toBeGreaterThanOrEqual(a)
return n;
}, 0);
}));

test('Multiple Requests', run(async scheduler => {
const lines = sampleText().split('\n');
const pResponses = lines.map((line, index) => [
Expand Down Expand Up @@ -61,17 +76,33 @@ describe('Scheduler', () => {
expect(scheduler.scheduleRequest(createRequestSpin(4000))).rejects.toEqual(expect.objectContaining({ message: expect.stringContaining('stopped')})),
expect(scheduler.scheduleRequest(createRequestSpin(2000))).rejects.toEqual(expect.objectContaining({ message: expect.stringContaining('stopped')})),
expect(delay(1).then(() => scheduler.dispose())).resolves.toEqual(expect.any(Number)),
expect(delay(5).then(() => scheduler.scheduleRequest(createRequestEcho('Too Late')))).rejects.toEqual(expect.objectContaining({ message: expect.stringContaining('stopped')})),
expect(delay(2).then(() => scheduler.dispose())).resolves.toEqual(expect.any(Number)),
expect(delay(3).then(() => scheduler.dispose())).resolves.toEqual(expect.any(Number)),
]);
});


test('Termination of single request', run(scheduler => {
const junkRequest: any = {};
const spinRequest = createRequestSpin(5000);
return Promise.all([
expect(scheduler.scheduleRequest(spinRequest)).rejects.toEqual(expect.objectContaining({ message: expect.stringContaining('Request Terminated')})),
expect(scheduler.scheduleRequest(createRequestEcho('One'))).resolves.toEqual(expect.objectContaining({ data: 'One' })),
expect(scheduler.scheduleRequest(createRequestEcho('Two'))).resolves.toEqual(expect.objectContaining({ data: 'Two' })),
expect(delay(1).then(() => scheduler.terminateRequest(spinRequest.id)))
expect(scheduler.scheduleRequest(junkRequest)).rejects.toEqual(expect.objectContaining({ message: 'Bad Request' })),
expect(scheduler.terminateRequest('Bad ID')).rejects.toEqual(expect.objectContaining({ message: 'Unknown Request' })),
expect(delay(1).then(() => scheduler.terminateRequest(spinRequest.id))),
]);
}));

test('Timeout Request', run(scheduler => {
const spinRequest = createRequestSpin(500);
return Promise.all([
expect(scheduler.scheduleRequest(createRequestEcho('One'))).resolves.toEqual(expect.objectContaining({ data: 'One' })),
expect(scheduler.scheduleRequest(createRequestEcho('Two'))).resolves.toEqual(expect.objectContaining({ data: 'Two' })),
expect(scheduler.scheduleRequest(spinRequest, 5)).rejects.toEqual(expect.objectContaining({ message: expect.stringContaining('Request Timeout')})),
expect(scheduler.scheduleRequest(createRequestEcho('Three'))).resolves.toEqual(expect.objectContaining({ data: 'Three' })),
]);
}));
});
Expand Down
31 changes: 24 additions & 7 deletions src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import { createWorker, Worker } from '../worker/worker';
import { Request, Response, isResponse, createRequest, ErrorResponse, isErrorResponse } from '../Procedures/procedure';
import { Request, Response, isResponse, createRequest, ErrorResponse, isErrorResponse, isRequest } from '../Procedures/procedure';
import { UniqueID } from '../Procedures/uniqueId';

const defaultTimeLimitMs = 100;

export class Scheduler {
private pending: Map<UniqueID, (v: Response | Promise<Response>) => any>;
private requestQueue: Map<UniqueID, PendingRequest>;
private worker: Worker;
private currentRequest: UniqueID | undefined;
private timeoutID: NodeJS.Timeout | undefined;
private stopped = false;
public dispose: () => Promise<number>;

constructor() {
constructor(public executionTimeLimitMs = defaultTimeLimitMs) {
this.dispose = () => this._dispose();
this.worker = createWorker();
this.worker.on('message', v => this.listener(v))
Expand All @@ -19,25 +22,28 @@ export class Scheduler {
this.currentRequest = undefined;
}

public scheduleRequest<T extends Request, U extends Response>(request: T): Promise<U> {
public scheduleRequest<T extends Request, U extends Response>(request: T, timeLimitMs = this.executionTimeLimitMs): Promise<U> {
if (this.stopped) {
return Promise.reject(new ErrorCanceledRequest('Scheduler has been stopped', request.requestType, request.data))
}
if (this.requestQueue.has(request.id)) {
return this.requestQueue.get(request.id)!.promise as Promise<U>;
}
if (!isRequest(request)) {
return Promise.reject(new ErrorBadRequest('Bad Request', request))
}
const promise = new Promise<U>((resolve) => {
this.pending.set(request.id, v => resolve(v as U));
this.trigger();
}).then(r => checkResponse(r, request.data));
this.requestQueue.set(request.id, { request, promise })
this.requestQueue.set(request.id, { request, promise, timeLimitMs })
this.trigger()
return promise;
}

public terminateRequest(requestId: UniqueID): Promise<void> {
public terminateRequest(requestId: UniqueID, message = 'Request Terminated'): Promise<void> {
const pRestartWorker = (requestId === this.currentRequest) ? this.restartWorker() : Promise.resolve();
return pRestartWorker.then(() => this._terminateRequest(requestId, 'Request Terminated'));
return pRestartWorker.then(() => this._terminateRequest(requestId, message));
}

private _dispose() {
Expand Down Expand Up @@ -99,8 +105,13 @@ export class Scheduler {
if (this.currentRequest) return;
const req = this.getNextRequest();
if (!req) return;
this.currentRequest = req.request.id;
const requestId = req.request.id;
this.currentRequest = requestId;
this.timeoutID = setTimeout(() => {
this.terminateRequest(requestId, 'Request Timeout');
}, req.timeLimitMs)
this.worker.postMessage(req.request);

})
}

Expand All @@ -109,6 +120,8 @@ export class Scheduler {
this.requestQueue.delete(id);
if (this.currentRequest === id) {
this.currentRequest = undefined;
if (this.timeoutID) clearTimeout(this.timeoutID);
this.timeoutID = undefined;
}
this.trigger();
}
Expand All @@ -125,14 +138,17 @@ export class Scheduler {
}

export class ErrorCanceledRequest<T> {
readonly timestamp = Date.now();
constructor(readonly message: string, readonly requestType: string | undefined, readonly data?: T) {}
}

export class ErrorFailedRequest<T> {
readonly timestamp = Date.now();
constructor(readonly message: string, readonly requestType: string | undefined, readonly data?: T) {}
}

export class ErrorBadRequest<T> {
readonly timestamp = Date.now();
constructor(readonly message: string, readonly data?: T) {}
}

Expand All @@ -152,4 +168,5 @@ function checkResponse<T extends Response, D>(
interface PendingRequest {
request: Request;
promise: Promise<Response>;
timeLimitMs: number;
}