Skip to content

Commit

Permalink
refactor!: rewrite and optimize worker logic, clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Mar 23, 2024
1 parent 39367b0 commit f9120de
Show file tree
Hide file tree
Showing 42 changed files with 689 additions and 737 deletions.
573 changes: 148 additions & 425 deletions package-lock.json

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions package.json
Expand Up @@ -39,7 +39,8 @@
"import": "./dist/esm/index.js",
"require": "./dist/cjs/index.js",
"types": "./dist/types/index.d.ts"
}
},
"./package.json": "./package.json"
},
"typesVersions": {
"*": {
Expand All @@ -59,7 +60,7 @@
"uuid": "8.3.2"
},
"devDependencies": {
"@jest/globals": "^29.7.0",
"@jest/globals": "29.7.0",
"@types/bluebird": "3.5.36",
"@types/debug": "4.1.7",
"@types/jest": "29.5.8",
Expand All @@ -71,7 +72,7 @@
"eslint": "8.53.0",
"eslint-config-prettier": "9.0.0",
"eslint-plugin-prettier": "5.0.1",
"husky": "7.0.4",
"husky": "9.0.11",
"jest": "29.7.0",
"lint-staged": "11.1.2",
"prettier": "3.0.3",
Expand Down
5 changes: 2 additions & 3 deletions scripts/test.sh
Expand Up @@ -11,7 +11,6 @@ set -x
set -e

export NODE_ENV=test
DIRNAME=$(dirname "$0")

. "${DIRNAME}"/build.sh
export NODE_OPTIONS="$NODE_OPTIONS --trace-warnings"
npm run build
jest --runInBand --verbose --collectCoverage "$@"
2 changes: 1 addition & 1 deletion src/event-bus/event-bus-redis.ts
Expand Up @@ -119,7 +119,7 @@ export class EventBusRedis<Events extends TEventBusEvent>
return this;
}

shutDown(cb: ICallback<void>) {
shutdown(cb: ICallback<void>) {
if (this.connected) {
async.waterfall(
[
Expand Down
2 changes: 1 addition & 1 deletion src/event-bus/event-bus.ts
Expand Up @@ -91,7 +91,7 @@ export class EventBus<Events extends TEventBusEvent>
return this;
}

shutDown(cb: ICallback<void>) {
shutdown(cb: ICallback<void>) {
if (this.connected) this.connected = false;
cb();
}
Expand Down
2 changes: 1 addition & 1 deletion src/event-bus/types/event-bus.ts
Expand Up @@ -19,5 +19,5 @@ export type TEventBusEvent = TEventEmitterEvent & {

export interface IEventBus<Events extends TEventBusEvent>
extends IEventEmitter<Events> {
shutDown(cb: ICallback<void>): void;
shutdown(cb: ICallback<void>): void;
}
2 changes: 1 addition & 1 deletion src/redis-client/clients/ioredis-client.ts
Expand Up @@ -389,7 +389,7 @@ export class IoredisClient extends RedisClientAbstract {
}
}

shutDown(cb: ICallback<void> = () => void 0): void {
shutdown(cb: ICallback<void> = () => void 0): void {
if (!this.connectionClosed) {
this.client.once('end', cb);
this.client.quit();
Expand Down
2 changes: 1 addition & 1 deletion src/redis-client/clients/node-redis-client.ts
Expand Up @@ -517,7 +517,7 @@ export class NodeRedisClient extends RedisClientAbstract {
}
}

shutDown(cb: ICallback<void> = () => void 0): void {
shutdown(cb: ICallback<void> = () => void 0): void {
if (!this.connectionClosed) {
this.client.once('end', cb);
this.client.quit();
Expand Down
2 changes: 1 addition & 1 deletion src/redis-client/clients/redis-client-abstract.ts
Expand Up @@ -322,7 +322,7 @@ export abstract class RedisClientAbstract

abstract end(flush: boolean): void;

abstract shutDown(cb: ICallback<void>): void;
abstract shutdown(cb: ICallback<void>): void;

abstract getInfo(cb: ICallback<string>): void;

Expand Down
2 changes: 1 addition & 1 deletion src/redis-client/types/redis-client.ts
Expand Up @@ -246,7 +246,7 @@ export interface IRedisClient extends EventEmitter<TRedisClientEvent> {

end(flush: boolean): void;

shutDown(cb: ICallback<void>): void;
shutdown(cb: ICallback<void>): void;

getInfo(cb: ICallback<string>): void;

Expand Down
10 changes: 5 additions & 5 deletions src/worker/errors/worker-thread.error.ts
Expand Up @@ -8,17 +8,17 @@
*/

import {
EWorkerThreadExecutionCode,
EWorkerThreadExitCode,
TWorkerThreadMessage,
EWorkerThreadChildExecutionCode,
EWorkerThreadChildExitCode,
TWorkerThreadChildMessage,
} from '../types/index.js';
import { WorkerError } from './worker-error.js';

export class WorkerThreadError extends WorkerError {
constructor(msg: TWorkerThreadMessage) {
constructor(msg: TWorkerThreadChildMessage) {
const { code, error } = msg;
const messageStr = `Error code: ${
EWorkerThreadExitCode[code] ?? EWorkerThreadExecutionCode[code]
EWorkerThreadChildExitCode[code] ?? EWorkerThreadChildExecutionCode[code]
}.${error ? ` Cause: ${error.name}(${error.message})` : ''}`;
super(messageStr);
}
Expand Down
1 change: 1 addition & 0 deletions src/worker/types/index.ts
Expand Up @@ -8,3 +8,4 @@
*/

export * from './worker.js';
export * from './worker-thread.js';
71 changes: 71 additions & 0 deletions src/worker/types/worker-thread.ts
@@ -0,0 +1,71 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { EWorkerType } from './worker.js';

export interface IWorkerThreadData {
type: EWorkerType;
filename: string;
initialPayload: unknown;
}

export enum EWorkerThreadChildExitCode {
WORKER_DATA_REQUIRED = 100,
INVALID_WORKER_TYPE,
FILE_IMPORT_ERROR,
UNCAUGHT_EXCEPTION,
FILE_EXTENSION_ERROR,
FILE_READ_ERROR,
TERMINATED,
}

export enum EWorkerThreadChildExecutionCode {
OK = 200,
PROCESSING_ERROR,
PROCESSING_CAUGHT_ERROR,
}

export type TWorkerThreadChildMessageCode =
| EWorkerThreadChildExitCode
| EWorkerThreadChildExecutionCode;

export type TWorkerThreadChildError = {
name: string;
message: string;
};

export type TWorkerThreadChildMessage<Data = unknown> = {
code: TWorkerThreadChildMessageCode;
data?: Data;
error?: TWorkerThreadChildError | null;
};

export enum EWorkerThreadParentMessage {
CALL,
RUN,
SHUTDOWN,
}

export type TWorkerThreadParentMessageCall = {
type: EWorkerThreadParentMessage.CALL;
payload: unknown;
};

export type TWorkerThreadParentMessageRun = {
type: EWorkerThreadParentMessage.RUN;
};

export type TWorkerThreadParentMessageShutdown = {
type: EWorkerThreadParentMessage.SHUTDOWN;
};

export type TWorkerThreadParentMessage =
| TWorkerThreadParentMessageCall
| TWorkerThreadParentMessageRun
| TWorkerThreadParentMessageShutdown;
59 changes: 17 additions & 42 deletions src/worker/types/worker.ts
Expand Up @@ -14,49 +14,24 @@ export enum EWorkerType {
RUNNABLE,
}

// eslint-disable-next-line
export type TWorkerFn = (...args: [...any[], ICallback<any>]) => void; // type-coverage:ignore-line

export interface IWorkerRunnable<Payload> {
run(initialPayload: Payload, cb: ICallback<void>): void;
export type TWorkerCallableFunction = (
args: unknown,
cb: ICallback<unknown>,
) => void;

export type TWorkerRunnableFunctionFactory = (
initialPayload: unknown,
) => IWorkerRunnable;

export type TWorkerFunction =
| TWorkerRunnableFunctionFactory
| TWorkerCallableFunction;

export interface IWorkerRunnable {
run(cb: ICallback<void>): void;
shutdown(cb: ICallback<void>): void;
}

export interface IWorkerCallable<Payload, Reply> {
call(payload: Payload, cb: ICallback<Reply>): void;
}

export interface IWorkerData {
type: EWorkerType;
filename: string;
}

export enum EWorkerThreadExitCode {
WORKER_DATA_REQUIRED = 100,
INVALID_WORKER_TYPE,
FILE_IMPORT_ERROR,
UNCAUGHT_EXCEPTION,
FILE_EXTENSION_ERROR,
FILE_READ_ERROR,
TERMINATED,
call(args: Payload, cb: ICallback<Reply>): void;
}

export enum EWorkerThreadExecutionCode {
OK = 200,
PROCESSING_ERROR,
PROCESSING_CAUGHT_ERROR,
}

export type TWorkerThreadMessageCode =
| EWorkerThreadExitCode
| EWorkerThreadExecutionCode;

export type TWorkerThreadError = {
name: string;
message: string;
};

export type TWorkerThreadMessage = {
code: TWorkerThreadMessageCode;
data?: unknown;
error?: TWorkerThreadError | null;
};
16 changes: 12 additions & 4 deletions src/worker/worker-callable.ts
Expand Up @@ -8,22 +8,30 @@
*/

import { ICallback } from '../common/index.js';
import { EWorkerType, IWorkerCallable } from './types/index.js';
import { WorkerPayloadRequiredError } from './errors/index.js';
import {
EWorkerThreadParentMessage,
EWorkerType,
IWorkerCallable,
} from './types/index.js';
import { Worker } from './worker.js';

export class WorkerCallable<Payload, Reply>
extends Worker
extends Worker<Payload, Reply>
implements IWorkerCallable<Payload, Reply>
{
protected readonly type: EWorkerType = EWorkerType.CALLABLE;

constructor(workerFilename: string) {
super(workerFilename);
}

call(payload: Payload, cb: ICallback<Reply>) {
if (payload === null || payload === undefined) {
cb(new WorkerPayloadRequiredError());
} else {
// @ts-expect-error reply data type is known only at runtime
this.exec(payload, cb);
this.registerEvents(cb);
this.postMessage({ type: EWorkerThreadParentMessage.CALL, payload });
}
}
}
12 changes: 5 additions & 7 deletions src/worker/worker-resource-group.ts
Expand Up @@ -28,8 +28,7 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
protected readonly locker;
protected readonly redisClient;
protected readonly logger;
protected workers: { instance: WorkerRunnable<unknown>; payload: unknown }[] =
[];
protected workers: WorkerRunnable<unknown>[] = [];
protected runWorkersLocked = false;

constructor(
Expand Down Expand Up @@ -77,8 +76,7 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
async.each(
this.workers,
(worker, _, done) => {
const { instance, payload } = worker;
instance.run(payload, done);
worker.run(done);
},
(err) => {
this.runWorkersLocked = false;
Expand All @@ -94,7 +92,7 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
async.each(
this.workers,
(worker, _, done) => {
worker.instance.shutDown(() => done());
worker.shutdown(() => done());
},
() => {
this.workers = [];
Expand Down Expand Up @@ -129,9 +127,9 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
}

addWorker = (filename: string, payload: unknown): void => {
const worker = new WorkerRunnable(filename);
const worker = new WorkerRunnable(filename, payload);
worker.on('worker.error', (err) => this.handleError(err));
this.workers.push({ instance: worker, payload });
this.workers.push(worker);
};

loadFromDir = (
Expand Down

0 comments on commit f9120de

Please sign in to comment.