Skip to content

Commit 6738b46

Browse files
committed
feat: add optional init function
1 parent 34d3250 commit 6738b46

4 files changed

Lines changed: 183 additions & 69 deletions

File tree

src/Ework.ts

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ export interface IEworkOptions {
2525
* Default: 1.
2626
*/
2727
minFreeThreads?: number;
28+
/**
29+
* Initialization function that will be executed after spawning the worker and
30+
* before sending jobs to it.
31+
*/
32+
init?: () => any;
2833
}
2934

3035
type WorkerResolveFn<Output> = (result: Output) => void;
@@ -43,13 +48,28 @@ interface IWorkerJob<Input, Output> {
4348
reject: WorkerRejectFn;
4449
}
4550

46-
interface IWorkerMessage<Output> {
47-
id: number;
48-
type: 'result';
49-
status: 'success' | 'error';
50-
result?: Output;
51-
error?: any;
52-
}
51+
type IWorkerMessage<Output> =
52+
| {
53+
type: 'result';
54+
id: number;
55+
status: 'success';
56+
result: Output;
57+
}
58+
| {
59+
type: 'result';
60+
id: number;
61+
status: 'error';
62+
error: any;
63+
}
64+
| {
65+
type: 'init';
66+
status: 'success';
67+
}
68+
| {
69+
type: 'init';
70+
status: 'error';
71+
error: any;
72+
};
5373

5474
export class Ework<Input, Output> {
5575
private totalWorkers: number;
@@ -69,7 +89,11 @@ export class Ework<Input, Output> {
6989
throw new TypeError('options must be an object');
7090
}
7191

72-
const { maxWorkers = numCpus, minFreeThreads = 1 } = options;
92+
const {
93+
maxWorkers = numCpus,
94+
minFreeThreads = 1,
95+
init = () => null,
96+
} = options;
7397

7498
if (!Number.isInteger(maxWorkers) || maxWorkers < 1) {
7599
throw new RangeError('options.maxWorkers must be a positive integer');
@@ -80,28 +104,41 @@ export class Ework<Input, Output> {
80104
);
81105
}
82106

107+
if (typeof init !== 'function') {
108+
throw new TypeError('options.init must be a function');
109+
}
110+
111+
const initString = init.toString();
83112
const workerString = worker.toString();
84-
const workerCode = makeWorkerCode(workerString);
113+
const workerCode = makeWorkerCode(initString, workerString);
85114

86115
this.totalWorkers = Math.max(
87116
Math.min(maxWorkers, numCpus - minFreeThreads),
88117
1,
89118
);
90-
this.freeWorkers = this.totalWorkers;
119+
this.freeWorkers = 0;
91120

92121
this.workers = [];
93122
for (let i = 0; i < this.totalWorkers; i++) {
94123
const worker = spawnWorker(workerCode);
95124
const workerObj: IWorker<Input, Output> = {
96125
worker,
97-
isWorking: false,
126+
isWorking: true,
98127
job: null,
99128
};
100129
addWorkerListener(
101130
worker,
102131
'message',
103132
(message: IWorkerMessage<Output>) => {
104-
if (message.type === 'result') {
133+
if (message.type === 'init') {
134+
if (message.status === 'success') {
135+
workerObj.isWorking = false;
136+
this.freeWorkers++;
137+
this.run();
138+
} else {
139+
// TODO handle init error
140+
}
141+
} else if (message.type === 'result') {
105142
const job = workerObj.job;
106143
if (job === null) {
107144
throw new Error('UNREACHABLE');
@@ -110,9 +147,6 @@ export class Ework<Input, Output> {
110147
throw new Error('UNREACHABLE');
111148
}
112149
if (message.status === 'success') {
113-
if (message.result === undefined) {
114-
throw new Error('UNREACHABLE');
115-
}
116150
job.resolve(message.result);
117151
} else {
118152
job.reject(new Error(message.error));
@@ -121,9 +155,12 @@ export class Ework<Input, Output> {
121155
workerObj.isWorking = false;
122156
this.freeWorkers++;
123157
this.run();
158+
} else {
159+
throw new Error('UNREACHABLE');
124160
}
125161
},
126162
);
163+
worker.postMessage({ type: 'init' });
127164
this.workers.push(workerObj);
128165
}
129166

src/__tests__/Ework.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ describe('constructor errors', () => {
3636
).toThrow(/options\.minFreeThreads must be a positive integer or 0/);
3737
},
3838
);
39+
40+
it.each([null, 42, 'test'])(
41+
'should throw if init is not a function (%s)',
42+
(value: any) => {
43+
expect(
44+
() =>
45+
new Ework(() => null, {
46+
init: value,
47+
}),
48+
).toThrow(/init must be a function/);
49+
},
50+
);
3951
});
4052

4153
describe('execute', () => {
@@ -115,3 +127,28 @@ describe('map', () => {
115127
await worker.terminate();
116128
});
117129
});
130+
131+
describe('init function', () => {
132+
it('should execute and wait for the init function', async () => {
133+
const worker = new Ework(
134+
() => {
135+
// @ts-ignore
136+
return global.initValue;
137+
},
138+
{
139+
init: async () => {
140+
await new Promise((resolve) => {
141+
setTimeout(() => {
142+
// @ts-ignore
143+
global.initValue = 42;
144+
resolve();
145+
}, 500);
146+
});
147+
},
148+
},
149+
);
150+
const result = await worker.execute(null);
151+
expect(result).toBe(42);
152+
await worker.terminate();
153+
});
154+
});

src/worker.browser.ts

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,48 +28,68 @@ export function addWorkerListener(
2828
worker.addEventListener(type, (event) => listener(event.data));
2929
}
3030

31-
export function makeWorkerCode(workerString: string): string {
31+
export function makeWorkerCode(
32+
initString: string,
33+
workerString: string,
34+
): string {
3235
return `'use strict';
3336
37+
const initFunction = ${initString};
3438
const workerFunction = ${workerString};
39+
const messagePort = self;
3540
36-
self.onmessage = async (event) => {
41+
function postError(obj, error) {
42+
let errorToTransfer = error;
43+
if (error) {
44+
if (error.stack) {
45+
errorToTransfer = error.stack;
46+
} else if (error.message) {
47+
errorToTransfer = error.message;
48+
}
49+
}
50+
try {
51+
messagePort.postMessage(Object.assign({}, obj, {
52+
status: 'error',
53+
error: errorToTransfer
54+
}));
55+
} catch (e) {
56+
messagePort.postMessage({
57+
status: 'error',
58+
error: 'Work failed but error could not be transferred: ' + e.message
59+
});
60+
}
61+
}
62+
63+
messagePort.onmessage = async (event) => {
3764
const { data: message } = event;
3865
if (message.type === 'work') {
3966
try {
4067
const result = await workerFunction(message.value);
41-
self.postMessage({
68+
messagePort.postMessage({
4269
id: message.id,
4370
type: 'result',
4471
status: 'success',
4572
result
4673
});
4774
} catch (error) {
48-
let errorToTransfer = error;
49-
if (error) {
50-
if (error.stack) {
51-
errorToTransfer = error.stack;
52-
} else if (error.message) {
53-
errorToTransfer = error.message;
54-
}
55-
}
56-
try {
57-
self.postMessage({
58-
id: message.id,
59-
type: 'result',
60-
status: 'error',
61-
error: errorToTransfer
62-
});
63-
} catch (e) {
64-
self.postMessage({
65-
id: message.id,
66-
type: 'result',
67-
status: 'error',
68-
error: 'Work failed but error could not be transferred: ' + e.message
69-
});
70-
}
75+
postError({
76+
id: message.id,
77+
type: 'result'
78+
}, error);
79+
}
80+
} else if (message.type === 'init') {
81+
try {
82+
await initFunction();
83+
messagePort.postMessage({
84+
type: 'init',
85+
status: 'success'
86+
});
87+
} catch (error) {
88+
postError({
89+
type: 'init'
90+
}, error);
7191
}
7292
}
7393
};
74-
`;
94+
`;
7595
}

src/worker.ts

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,49 +31,69 @@ export function addWorkerListener(
3131
worker.addListener(type, listener);
3232
}
3333

34-
export function makeWorkerCode(workerString: string): string {
34+
export function makeWorkerCode(
35+
initString: string,
36+
workerString: string,
37+
): string {
3538
return `'use strict';
3639
3740
const { parentPort } = require('worker_threads');
3841
42+
const initFunction = ${initString};
3943
const workerFunction = ${workerString};
44+
const messagePort = parentPort;
4045
41-
parentPort.on('message', async (message) => {
46+
function postError(obj, error) {
47+
let errorToTransfer = error;
48+
if (error) {
49+
if (error.stack) {
50+
errorToTransfer = error.stack;
51+
} else if (error.message) {
52+
errorToTransfer = error.message;
53+
}
54+
}
55+
try {
56+
messagePort.postMessage(Object.assign({}, obj, {
57+
status: 'error',
58+
error: errorToTransfer
59+
}));
60+
} catch (e) {
61+
messagePort.postMessage({
62+
status: 'error',
63+
error: 'Work failed but error could not be transferred: ' + e.message
64+
});
65+
}
66+
}
67+
68+
messagePort.on('message', async (message) => {
4269
if (message.type === 'work') {
4370
try {
4471
const result = await workerFunction(message.value);
45-
parentPort.postMessage({
72+
messagePort.postMessage({
4673
id: message.id,
4774
type: 'result',
4875
status: 'success',
4976
result
5077
});
5178
} catch (error) {
52-
let errorToTransfer = error;
53-
if (error) {
54-
if (error.stack) {
55-
errorToTransfer = error.stack;
56-
} else if (error.message) {
57-
errorToTransfer = error.message;
58-
}
59-
}
60-
try {
61-
parentPort.postMessage({
62-
id: message.id,
63-
type: 'result',
64-
status: 'error',
65-
error: errorToTransfer
66-
});
67-
} catch (e) {
68-
parentPort.postMessage({
69-
id: message.id,
70-
type: 'result',
71-
status: 'error',
72-
error: 'Work failed but error could not be transferred: ' + e.message
73-
});
74-
}
79+
postError({
80+
id: message.id,
81+
type: 'result'
82+
}, error);
83+
}
84+
} else if (message.type === 'init') {
85+
try {
86+
await initFunction();
87+
messagePort.postMessage({
88+
type: 'init',
89+
status: 'success'
90+
});
91+
} catch (error) {
92+
postError({
93+
type: 'init'
94+
}, error);
7595
}
7696
}
7797
});
78-
`;
98+
`;
7999
}

0 commit comments

Comments
 (0)