Skip to content
Permalink
Browse files

Commited changes

  • Loading branch information...
xkizer committed Feb 26, 2019
1 parent 3dff9ff commit dfe036575d8dd28b693496865b87d829a75c1f13
Showing with 184 additions and 51 deletions.
  1. +3 −0 .gitignore
  2. +142 −40 src/context.ts
  3. +37 −10 src/worker.ts
  4. +2 −1 tsconfig.json
@@ -0,0 +1,3 @@
./dist
./build
./node_modules
@@ -10,11 +10,18 @@ export enum Actions {
CREATE_FUNCTION,
CREATE_FUNCTION_FAIL,
CREATE_FUNCTION_SUCCEED,
CALL_FUNCTION,
CALL_FUNCTION_FAIL,
CALL_FUNCTION_SUCCEED,
}

export function createContext(): ThreadContext {
const contextId = generateContextId();

let threads: WorkerThread[] | null = Array(CPU_COUNT).fill(0).map(() => {
return new WorkerThread();
const thread = new WorkerThread();
console.log(`Created thread "${thread.id}" for context "${contextId}"`);
return thread;
});

function ensureNotDestroyed() {
@@ -24,46 +31,79 @@ export function createContext(): ThreadContext {
}

return {
createThreadedFunction<A extends any[], R>(fn: (...args: A) => R): ThreadedFunction<A, R> {
async createThreadedFunction<A extends any[], R>(fn: (...args: A) => R): Promise<ThreadedFunction<A, R>> {
// Create a unique identifier for this function
ensureNotDestroyed();
const fnId = randomatic('Aa0', 32);
threads!.forEach((thread) => {
thread.postMessage({
action: Actions.CREATE_FUNCTION,
id: fnId,
definition: fn.toString(),
});
const fnId = generateFunctionId();
const definition = fn.toString();
const defs: FunctionDefs = [];
let calls = 0;

const promises = threads!.map(async (thread) => {
try {
await thread.createFunction(fnId, definition);
defs.push(thread);
console.log('Function created on thread', thread.id);
} catch (e) {
console.error('Function creation failed on thread', thread.id);
}
});

// Wait and ensure that at least one succeeded
await Promise.all(promises);
promises.length = 0;

if (defs.length === 0) {
console.error('Function creation failed on all threads');
throw new Error('Function creation failed');
}

const retFn = async function(...args: A): Promise<R> {
// choose next thread round-robin style
const callId = generateCallId();
const thread = defs[calls++ % defs.length];
return thread.callFunction<R>(fnId, callId, ...args);
};

retFn.length = fn.length;
retFn.name = fn.name;
return retFn;
},

async destroyContext() {
ensureNotDestroyed();
const terminateAll = threads!.map((thread) => {
return new Promise((res, rej) => {
thread.terminate((err: Error, exitCode: number) => {
if (err) {
rej(err);
}

res(exitCode);
});
});
});

const terminateAll = threads!.map((thread) => thread.terminate());
await Promise.all(terminateAll);
threads = null;
}
};
}

function generateFunctionId(): CallId {
type FunctionDefs = WorkerThread[];

function generateFunctionId(): FunctionId {
return randomatic('Aa0', 8);
}

interface CallId extends String {}
function generateCallId(): CallId {
return randomatic('Aa0', 24);
}

interface FunctionId extends String {}
function generateThreadId(): ThreadId {
return randomatic('Aa0', 4);
}

function generateContextId(): ContextId {
return randomatic('Aa0', 4);
}

export interface CallId extends String {}

export interface FunctionId extends String {}

export interface ThreadId extends String {}

export interface ContextId extends String {}

type FunctionCreateLog = Map<FunctionId, CallbackPair>;

@@ -81,47 +121,82 @@ class WorkerThread {

private functionCreateLog: FunctionCreateLog = new Map();

readonly id: ThreadId;

constructor() {
this.worker = new Worker(WORKER_FILE);
this.setupListeners();
this.id = generateThreadId();
}

private setupListeners() {
this.ensureNotDestroyed();
this.worker!.on('message', (message: ThreadMessage) => {
switch(message.action) {
case Actions.CREATE_FUNCTION_FAIL:
this.createFunctionFail(message.fnId, message.error);
this.createFunctionFail(message.fnId, message.error!);
break;
case Actions.CREATE_FUNCTION_SUCCEED:
this.createFunctionSucceed(message.fnId);
break;
case Actions.CALL_FUNCTION_FAIL:
this.callFunctionFail(message.callId, message.error!);
break;
case Actions.CALL_FUNCTION_SUCCEED:
this.callFunctionSucceed(message.callId, message.value);
break;
}
});
}

private createFunctionFail(functionId: CallId, error: string) {
const callbacks = this.callLog.get(functionId);
private createFunctionFail(functionId: FunctionId, error: string) { // TODO: figure out how the actual error can be transfered if possible to allow stack tracing
const callbacks = this.functionCreateLog.get(functionId);

if (!callbacks) {
// callback not found, log error and fail silently
console.error('Function creation failed, but callback not found');
return;
}

this.callLog.delete(functionId);
callbacks.fail()
this.functionCreateLog.delete(functionId);
callbacks.fail(new Error(error));
}

private createFunctionSucceed(functionId: CallId) {
const callbacks = this.callLog.get(functionId);
private createFunctionSucceed(functionId: FunctionId) {
const callbacks = this.functionCreateLog.get(functionId);

if (!callbacks) {
// callback not found, log error and fail silently
console.error('Function creation succeeded, but callback not found');
return;
}

this.callLog.delete(functionId);
callbacks.fail()
this.functionCreateLog.delete(functionId);
callbacks.success(null);
}

private callFunctionFail(callId: CallId, error: string) { // TODO: figure out how the actual error can be transfered if possible to allow stack tracing
const callbacks = this.callLog.get(callId);

if (!callbacks) {
console.error('Function call failed, but callback not found');
return;
}

this.callLog.delete(callId);
callbacks.fail(new Error(error));
}

private callFunctionSucceed(callId: CallId, result: any) { // TODO: figure out how the actual error can be transfered if possible to allow stack tracing
const callbacks = this.callLog.get(callId);

if (!callbacks) {
console.error('Function call succeeded, but callback not found');
return;
}

this.callLog.delete(callId);
callbacks.success(result);
}

ensureNotDestroyed() {
@@ -130,18 +205,40 @@ class WorkerThread {
}
}

createFunction(fn: Function) {
const fnId = generateFunctionId();
terminate() {
this.ensureNotDestroyed();
this.worker!.terminate(() => {
this.worker = null;
});
}

createFunction(fnId: FunctionId, fn: string) {
return new Promise((res, rej) => {
this.ensureNotDestroyed();
this.worker!.postMessage({
action: Actions.CREATE_FUNCTION,
id: fnId,
definition: fn.toString(),
fnId: fnId,
definition: fn,
});

this.callLog.set(fnId, {
this.functionCreateLog.set(fnId, {
fail: rej,
success: res,
});
});
}

callFunction<R>(fnId: FunctionId, callId: CallId, ...args: unknown[]) {
return new Promise<R>((res, rej) => {
this.ensureNotDestroyed();
this.worker!.postMessage({
action: Actions.CALL_FUNCTION,
fnId,
callId,
args,
});

this.callLog.set(callId, {
fail: rej,
success: res,
});
@@ -150,11 +247,16 @@ class WorkerThread {
}

export interface ThreadContext {
createThreadedFunction<A extends any[], R>(fn: (...args: A) => R): ThreadedFunction<A, R>;
createThreadedFunction<A extends any[], R>(fn: (...args: A) => R): Promise<ThreadedFunction<A, R>>;
destroyContext(): Promise<void>;
}

export type ThreadMessage = {
action: Actions;
[key: string]: any;
fnId: FunctionId;
callId: CallId;
definition?: string;
error?: string;
value?: unknown;
args?: unknown[];
};
@@ -1,39 +1,66 @@
import { isMainThread, parentPort } from 'worker_threads';
import { ThreadMessage, Actions } from './context';
import { ThreadMessage, Actions, FunctionId } from './context';

if (isMainThread || !parentPort) {
throw new Error('Worker must not be a main thread');
}

const functions: {[k: string]: Function} = {};
const functions = new Map<FunctionId, Function>();

parentPort.on('message', (message: ThreadMessage) => {
parentPort.on('message', async (message: ThreadMessage) => {
switch(message.action) {
case Actions.CREATE_FUNCTION:
try {
const id = message.id;
const fnId = message.fnId;

if (!id) {
if (!fnId) {
throw new Error('Invalid functon ID');
}

if (functions[id]) {
throw new Error(`Function with ID ${id} already exists`);
if (functions.get(fnId)) {
throw new Error(`Function with ID ${fnId} already exists`);
}

functions[id] = eval(message.definition);
functions.set(fnId, eval(message.definition!));

parentPort!.postMessage({
action: Actions.CREATE_FUNCTION_FAIL,
id: message.id,
fnId,
});
} catch (e) {
parentPort!.postMessage({
action: Actions.CREATE_FUNCTION_SUCCEED,
id: message.id,
fnId: message.fnId,
error: e.message,
});
}
break;
case Actions.CALL_FUNCTION:
try {
const callId = message.callId;
const fnId = message.fnId;
const fn = functions.get(fnId);

if (!fn) {
// Function not found, should not happen
throw new Error('Function not found');
}

// We use await to support async functions too
const value = await fn(...message.args);

parentPort!.postMessage({
action: Actions.CALL_FUNCTION_SUCCEED,
callId,
value,
});
} catch (e) {
parentPort!.postMessage({
action: Actions.CALL_FUNCTION_FAIL,
callId: message.callId,
error: e,
});
}
default:
throw new Error('Invalid message received');
}
@@ -13,7 +13,8 @@
"skipLibCheck": true,
"sourceMap": true,
"strictNullChecks": true,
"target": "es5"
"target": "es5",
"outDir": "./dist"
},
"exclude": [
"node_modules",

0 comments on commit dfe0365

Please sign in to comment.
You can’t perform that action at this time.