diff --git a/.npmignore b/.npmignore index 43960b9..345c250 100644 --- a/.npmignore +++ b/.npmignore @@ -9,7 +9,7 @@ tests/** .prettierrc index.ts tsconfig.json -jest.config.cjs +jest.config.js .idea types/** coverage/** diff --git a/index.ts b/index.ts index ae31b2f..afec3f2 100755 --- a/index.ts +++ b/index.ts @@ -10,17 +10,18 @@ export * from './types'; export * from './src/errors'; export * from './src/logger/errors'; -export * from './src/ticker/errors'; -export * from './src/lock/errors'; +export * from './src/timer/errors'; +export * from './src/locker/errors'; export * from './src/redis-client/errors'; export * from './src/worker/errors'; -export { Lock } from './src/lock/lock'; +export { Locker } from './src/locker/locker'; export { PowerSwitch } from './src/power-switch/power-switch'; export { RedisClient } from './src/redis-client/redis-client'; -export { Ticker } from './src/ticker/ticker'; -export { Worker } from './src/worker/worker'; -export { WorkerRunner } from './src/worker/worker-runner/worker-runner'; -export { WorkerPool } from './src/worker/worker-runner/worker-pool'; +export { Timer } from './src/timer/timer'; +export { Runnable } from './src/runnable/runnable'; +export { WorkerCallable } from './src/worker/worker-callable'; +export { WorkerRunnable } from './src/worker/worker-runnable'; +export { WorkerResourceGroup } from './src/worker/worker-resource-group'; export { logger } from './src/logger/logger'; export { async } from './src/async/async'; export { redis } from './src/redis-client'; diff --git a/jest.config.cjs b/jest.config.js similarity index 90% rename from jest.config.cjs rename to jest.config.js index 4d39f38..edad55a 100644 --- a/jest.config.cjs +++ b/jest.config.js @@ -12,7 +12,7 @@ const { resolve } = require('path'); module.exports = { rootDir: resolve('./'), - testMatch: ['**/dist/**/*.test.js'], + testMatch: ['/dist/tests/**/*.test.js'], setupFilesAfterEnv: ['/dist/tests/jest.setup.js'], coverageDirectory: '/coverage', }; diff --git a/package-lock.json b/package-lock.json index 3be3950..2aa0d1a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -648,6 +648,20 @@ "@babel/core": "^7.0.0-0" } }, + "node_modules/@babel/runtime": { + "version": "7.23.9", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.23.9.tgz", + "integrity": "sha512-0CX6F+BI2s9dkUqr08KFrAIZgNFj75rdBU/DjCyYLIaV/quFjkk6T+EJ2LkZHyZTbEV4L5p97mNkUsHl2wLFAw==", + "dev": true, + "optional": true, + "peer": true, + "dependencies": { + "regenerator-runtime": "^0.14.0" + }, + "engines": { + "node": ">=6.9.0" + } + }, "node_modules/@babel/template": { "version": "7.23.9", "resolved": "https://registry.npmjs.org/@babel/template/-/template-7.23.9.tgz", @@ -712,6 +726,32 @@ "integrity": "sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==", "dev": true }, + "node_modules/@cspotcode/source-map-support": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", + "integrity": "sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==", + "dev": true, + "optional": true, + "peer": true, + "dependencies": { + "@jridgewell/trace-mapping": "0.3.9" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/@cspotcode/source-map-support/node_modules/@jridgewell/trace-mapping": { + "version": "0.3.9", + "resolved": "https://registry.npmjs.org/@jridgewell/trace-mapping/-/trace-mapping-0.3.9.tgz", + "integrity": "sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==", + "dev": true, + "optional": true, + "peer": true, + "dependencies": { + "@jridgewell/resolve-uri": "^3.0.3", + "@jridgewell/sourcemap-codec": "^1.4.10" + } + }, "node_modules/@eslint-community/eslint-utils": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/@eslint-community/eslint-utils/-/eslint-utils-4.4.0.tgz", @@ -1319,6 +1359,38 @@ "@sinonjs/commons": "^3.0.0" } }, + "node_modules/@tsconfig/node10": { + "version": "1.0.9", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", + "integrity": "sha512-jNsYVVxU8v5g43Erja32laIDHXeoNvFEpX33OK4d6hljo3jDhCBDhx5dhCCTMWUojscpAagGiRkBKxpdl9fxqA==", + "dev": true, + "optional": true, + "peer": true + }, + "node_modules/@tsconfig/node12": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.11.tgz", + "integrity": "sha512-cqefuRsh12pWyGsIoBKJA9luFu3mRxCA+ORZvA4ktLSzIuCUtWVxGIuXigEwO5/ywWFMZ2QEGKWvkZG1zDMTag==", + "dev": true, + "optional": true, + "peer": true + }, + "node_modules/@tsconfig/node14": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.3.tgz", + "integrity": "sha512-ysT8mhdixWK6Hw3i1V2AeRqZ5WfXg1G43mqoYlM2nc6388Fq5jcXyr5mRsqViLx/GJYdoL0bfXD8nmF+Zn/Iow==", + "dev": true, + "optional": true, + "peer": true + }, + "node_modules/@tsconfig/node16": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.4.tgz", + "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==", + "dev": true, + "optional": true, + "peer": true + }, "node_modules/@types/babel__core": { "version": "7.20.5", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.20.5.tgz", @@ -1715,6 +1787,17 @@ "acorn": "^6.0.0 || ^7.0.0 || ^8.0.0" } }, + "node_modules/acorn-walk": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.2.tgz", + "integrity": "sha512-cjkyv4OtNCIeqhHrfS81QWXoCBPExR/J62oyEqepVw8WaQeSqpW2uhuLPh1m9eWhDuOo/jUXVTlifvesOWp/4A==", + "dev": true, + "optional": true, + "peer": true, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/aggregate-error": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.1.0.tgz", @@ -1817,6 +1900,14 @@ "node": ">= 8" } }, + "node_modules/arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "dev": true, + "optional": true, + "peer": true + }, "node_modules/argparse": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", @@ -1918,6 +2009,23 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/babel-plugin-macros": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/babel-plugin-macros/-/babel-plugin-macros-3.1.0.tgz", + "integrity": "sha512-Cg7TFGpIr01vOQNODXOOaGz2NpCU5gl8x1qJFbb6hbZxR7XrcE2vtbAsTAbJ7/xwJtUuJEw8K8Zr/AE0LHlesg==", + "dev": true, + "optional": true, + "peer": true, + "dependencies": { + "@babel/runtime": "^7.12.5", + "cosmiconfig": "^7.0.0", + "resolve": "^1.19.0" + }, + "engines": { + "node": ">=10", + "npm": ">=6" + } + }, "node_modules/babel-preset-current-node-syntax": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/babel-preset-current-node-syntax/-/babel-preset-current-node-syntax-1.0.1.tgz", @@ -2295,6 +2403,14 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "dev": true, + "optional": true, + "peer": true + }, "node_modules/cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -2371,6 +2487,17 @@ "node": ">=8" } }, + "node_modules/diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true, + "optional": true, + "peer": true, + "engines": { + "node": ">=0.3.1" + } + }, "node_modules/diff-sequences": { "version": "29.6.3", "resolved": "https://registry.npmjs.org/diff-sequences/-/diff-sequences-29.6.3.tgz", @@ -4235,6 +4362,14 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "dev": true, + "optional": true, + "peer": true + }, "node_modules/makeerror": { "version": "1.0.12", "resolved": "https://registry.npmjs.org/makeerror/-/makeerror-1.0.12.tgz", @@ -4834,6 +4969,14 @@ "node": ">=4" } }, + "node_modules/regenerator-runtime": { + "version": "0.14.1", + "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.14.1.tgz", + "integrity": "sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==", + "dev": true, + "optional": true, + "peer": true + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -5318,6 +5461,51 @@ "typescript": ">=4.2.0" } }, + "node_modules/ts-node": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", + "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", + "dev": true, + "optional": true, + "peer": true, + "dependencies": { + "@cspotcode/source-map-support": "^0.8.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "v8-compile-cache-lib": "^3.0.1", + "yn": "3.1.1" + }, + "bin": { + "ts-node": "dist/bin.js", + "ts-node-cwd": "dist/bin-cwd.js", + "ts-node-esm": "dist/bin-esm.js", + "ts-node-script": "dist/bin-script.js", + "ts-node-transpile-only": "dist/bin-transpile.js", + "ts-script": "dist/bin-script-deprecated.js" + }, + "peerDependencies": { + "@swc/core": ">=1.2.50", + "@swc/wasm": ">=1.2.50", + "@types/node": "*", + "typescript": ">=2.7" + }, + "peerDependenciesMeta": { + "@swc/core": { + "optional": true + }, + "@swc/wasm": { + "optional": true + } + } + }, "node_modules/tslib": { "version": "2.6.2", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", @@ -5496,6 +5684,14 @@ "uuid": "dist/bin/uuid" } }, + "node_modules/v8-compile-cache-lib": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", + "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", + "dev": true, + "optional": true, + "peer": true + }, "node_modules/v8-to-istanbul": { "version": "9.2.0", "resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.2.0.tgz", @@ -5620,6 +5816,17 @@ "node": ">=12" } }, + "node_modules/yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "dev": true, + "optional": true, + "peer": true, + "engines": { + "node": ">=6" + } + }, "node_modules/yocto-queue": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz", diff --git a/package.json b/package.json index 9ab157f..0ce789d 100644 --- a/package.json +++ b/package.json @@ -30,9 +30,8 @@ "bugs": { "url": "https://github.com/weyoss/redis-smq-common/issues" }, - "type": "module", - "main": "dist/index.cjs", - "module": "dist/index.js", + "type": "commonjs", + "main": "dist/index.js", "types": "dist/index.d.ts", "dependencies": { "@redis/client": "1.1.0", @@ -66,7 +65,7 @@ "scripts": { "build": "scripts/build.sh", "test": "scripts/test.sh", - "release": "scripts/release.sh", + "release": "scripts/release/release.sh", "format": "prettier --write \"src/**/*.ts\" \"tests/**/*.ts\"", "lint": "eslint \"{src,tests,types}/**/*.ts\"", "type-coverage": "type-coverage --strict --at-least 100 --ignore-files \"dist/**/*\"", diff --git a/scripts/build.sh b/scripts/build.sh index 7a14f93..bf133e2 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -10,7 +10,10 @@ set -x set -e +npm run type-coverage +npm run lint +npm run format rm -rf dist -tsc +tsc -p ./tsconfig.json cp -r src/redis-client/lua dist/src/redis-client/ -cp -r src/lock/redis-client dist/src/lock/ \ No newline at end of file +cp -r src/locker/redis-client dist/src/locker/ diff --git a/scripts/test.sh b/scripts/test.sh index 5eb49f1..5ac11c0 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -12,9 +12,4 @@ set -e export NODE_ENV=test npm run build -cat >dist/package.json < + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { EventBusError } from './event-bus.error'; + +export class EventBusConnectionError extends EventBusError {} diff --git a/src/event/errors/event-bus.error.ts b/src/event/errors/event-bus.error.ts new file mode 100644 index 0000000..085324d --- /dev/null +++ b/src/event/errors/event-bus.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { RedisSMQError } from '../../errors'; + +export class EventBusError extends RedisSMQError {} diff --git a/src/event/errors/index.ts b/src/event/errors/index.ts new file mode 100644 index 0000000..065525a --- /dev/null +++ b/src/event/errors/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export { EventBusError } from './event-bus.error'; +export { EventBusConnectionError } from './event-bus-connection.error'; diff --git a/src/event/event-bus-redis/event-bus-redis.ts b/src/event/event-bus-redis/event-bus-redis.ts new file mode 100644 index 0000000..a935133 --- /dev/null +++ b/src/event/event-bus-redis/event-bus-redis.ts @@ -0,0 +1,149 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { RedisClient } from '../../redis-client/redis-client'; +import { + ICallback, + IEventBus, + IRedisConfig, + TEventEmitterEvent, +} from '../../../types'; +import { async } from '../../async/async'; +import { EventBusConnectionError } from '../errors'; +import { redis } from '../../redis-client'; +import { EventEmitter } from '../event-emitter'; + +export class EventBusRedis + extends EventEmitter + implements IEventBus +{ + protected static instance: IEventBus | null = null; + protected connected = false; + protected pubClient; + protected subClient; + + protected constructor(pubClient: RedisClient, subClient: RedisClient) { + super(); + this.pubClient = pubClient; + this.subClient = subClient; + this.subClient.on('message', (channel: keyof Events, message: string) => { + this.eventEmitter.emit(String(channel), ...JSON.parse(message)); + }); + this.connected = true; + } + + override emit( + event: E, + ...args: Parameters + ): boolean { + if (!this.connected) { + throw new EventBusConnectionError(); + } + this.pubClient.publish(String(event), JSON.stringify(args), () => void 0); + return true; + } + + override on(event: E, listener: Events[E]): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + this.subClient.subscribe(String(event)); + super.on(event, listener); + return this; + } + + override once(event: E, listener: Events[E]): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + this.subClient.subscribe(String(event)); + super.once(event, listener); + return this; + } + + override removeAllListeners( + event?: Extract, + ): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + if (event) this.subClient.unsubscribe(String(event)); + else this.subClient.unsubscribe(); + super.removeAllListeners(event); + return this; + } + + override removeListener( + event: E, + listener: Events[E], + ): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + super.removeListener(event, listener); + return this; + } + + disconnect(cb: ICallback) { + if (this.connected) { + async.waterfall( + [ + (cb: ICallback) => this.subClient.halt(() => cb()), + (cb: ICallback) => this.pubClient.halt(() => cb()), + ], + () => { + this.connected = false; + if (this === EventBusRedis.instance) EventBusRedis.instance = null; + cb(); + }, + ); + } else cb(); + } + + static getInstance( + config: IRedisConfig, + cb: ICallback>, + ): void { + if (!EventBusRedis.instance) { + let pubClient: RedisClient | null | undefined = null; + let subClient: RedisClient | null | undefined = null; + async.waterfall( + [ + (cb: ICallback) => + redis.createInstance(config, (err, client) => { + if (err) cb(err); + else { + pubClient = client; + cb(); + } + }), + (cb: ICallback) => + redis.createInstance(config, (err, client) => { + if (err) cb(err); + else { + subClient = client; + cb(); + } + }), + ], + (err) => { + if (err) { + if (pubClient) pubClient.halt(() => void 0); + if (subClient) subClient.halt(() => void 0); + cb(err); + } else if (!pubClient || !subClient) cb(new Error()); + else { + EventBusRedis.instance = new EventBusRedis(pubClient, subClient); + cb(null, EventBusRedis.instance); + } + }, + ); + } else cb(null, EventBusRedis.instance); + } +} diff --git a/src/event/event-bus.ts b/src/event/event-bus.ts deleted file mode 100644 index ba26be0..0000000 --- a/src/event/event-bus.ts +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { EventEmitter } from './event-emitter'; -import { TEvent } from '../../types'; - -let instance: EventEmitter | null = null; - -export function getEventBusInstance(): EventEmitter { - if (!instance) { - instance = new EventEmitter(); - } - return instance; -} diff --git a/src/event/event-bus/event-bus.ts b/src/event/event-bus/event-bus.ts new file mode 100644 index 0000000..5d07731 --- /dev/null +++ b/src/event/event-bus/event-bus.ts @@ -0,0 +1,89 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { EventEmitter } from '../event-emitter'; +import { ICallback, IEventBus, TEventEmitterEvent } from '../../../types'; +import { EventBusConnectionError } from '../errors'; + +export class EventBus + extends EventEmitter + implements IEventBus +{ + protected static instance: IEventBus | null = null; + protected connected = false; + + protected constructor() { + super(); + this.connected = true; + } + + override emit( + event: E, + ...args: Parameters + ): boolean { + if (!this.connected) { + throw new EventBusConnectionError(); + } + return super.emit(event, ...args); + } + + override on(event: E, listener: Events[E]): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + super.on(event, listener); + return this; + } + + override once(event: E, listener: Events[E]): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + super.once(event, listener); + return this; + } + + override removeAllListeners( + event?: Extract, + ): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + super.removeAllListeners(event); + return this; + } + + override removeListener( + event: E, + listener: Events[E], + ): this { + if (!this.connected) { + throw new EventBusConnectionError(); + } + super.removeListener(event, listener); + return this; + } + + disconnect(cb: ICallback) { + if (this.connected) { + this.connected = false; + if (this === EventBus.instance) EventBus.instance = null; + } + cb(); + } + + static getInstance( + cb: ICallback>, + ): void { + if (!EventBus.instance) { + EventBus.instance = new EventBus(); + } + cb(null, EventBus.instance); + } +} diff --git a/src/event/event-emitter.ts b/src/event/event-emitter.ts index d5b41c2..ad1ecc8 100644 --- a/src/event/event-emitter.ts +++ b/src/event/event-emitter.ts @@ -7,24 +7,43 @@ * in the root directory of this source tree. */ -import { EventEmitter as Base } from 'events'; +import { EventEmitter as NodeEventEmitter } from 'events'; +import { IEventEmitter, TEventEmitterEvent } from '../../types'; -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export type TEventEmitterEvent = Record any>; // type-coverage:ignore-line +// A typed EventEmitter around Node's EventEmitter with a limited set of methods +export class EventEmitter + implements IEventEmitter +{ + protected eventEmitter; + constructor() { + this.eventEmitter = new NodeEventEmitter(); + } + + on(event: E, listener: Events[E]): this { + this.eventEmitter.on(String(event), listener); + return this; + } + + once(event: E, listener: Events[E]): this { + this.eventEmitter.once(String(event), listener); + return this; + } -export declare interface EventEmitter - extends Base { - on(event: E, listener: Events[E]): this; - once(event: E, listener: Events[E]): this; emit( event: E, ...args: Parameters - ): boolean; - removeAllListeners(event?: E): this; -} + ): boolean { + return this.eventEmitter.emit(String(event), ...args); + } -// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging -export class EventEmitter< - // eslint-disable-next-line @typescript-eslint/no-unused-vars - Events extends TEventEmitterEvent, -> extends Base {} + removeAllListeners(event?: Extract): this { + if (event) this.eventEmitter.removeAllListeners(event); + else this.eventEmitter.removeAllListeners(); + return this; + } + + removeListener(event: E, listener: Events[E]): this { + this.eventEmitter.removeListener(String(event), listener); + return this; + } +} diff --git a/src/event/index.ts b/src/event/index.ts index f031c97..fe98dfb 100644 --- a/src/event/index.ts +++ b/src/event/index.ts @@ -8,4 +8,5 @@ */ export * from './event-emitter'; -export * from './event-bus'; +export * from './event-bus-redis/event-bus-redis'; +export * from './event-bus/event-bus'; diff --git a/src/lock/errors/lock-not-released.error.ts b/src/lock/errors/lock-not-released.error.ts deleted file mode 100644 index 2e4ae0c..0000000 --- a/src/lock/errors/lock-not-released.error.ts +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { LockError } from './lock.error'; - -export class LockNotReleasedError extends LockError { - constructor( - message = `A lock has been already obtained but not yet released or maybe a pending operation is in progress.`, - ) { - super(message); - } -} diff --git a/src/lock/lock.ts b/src/lock/lock.ts deleted file mode 100644 index 5ba0fa2..0000000 --- a/src/lock/lock.ts +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { ICallback, TEvent } from '../../types'; -import { RedisClient } from '../redis-client/redis-client'; -import { v4 as uuid } from 'uuid'; -import { - LockAbortError, - LockAcquireError, - LockExtendError, - LockMethodNotAllowedError, - LockNotAcquiredError, - LockNotReleasedError, -} from './errors'; -import { ELuaScript } from './redis-client/redis-client'; -import { EventEmitter } from '../event'; - -export enum ELockStatus { - unlocked, - locking, - locked, - releasing, - extending, - extended, -} - -export class Lock extends EventEmitter { - protected readonly lockId: string; - protected readonly lockKey: string; - protected readonly retryOnFail: boolean; - protected readonly ttl: number; - protected readonly redisClient: RedisClient; - protected readonly autoExtend: boolean; - - protected status: ELockStatus = ELockStatus.unlocked; - protected lockingTimer: NodeJS.Timeout | null = null; - protected autoExtendTimer: NodeJS.Timeout | null = null; - - constructor( - redisClient: RedisClient, - lockKey: string, - ttl: number, - retryOnFail = false, - autoExtend = false, - ) { - super(); - this.lockKey = lockKey; - this.ttl = ttl; - this.retryOnFail = retryOnFail; - this.lockId = uuid(); - this.redisClient = redisClient; - this.autoExtend = autoExtend; - } - - protected resetTimers(): void { - if (this.lockingTimer) { - clearTimeout(this.lockingTimer); - this.lockingTimer = null; - } - if (this.autoExtendTimer) { - clearTimeout(this.autoExtendTimer); - this.autoExtendTimer = null; - } - } - - protected setUnlocked(): void { - this.status = ELockStatus.unlocked; - } - - protected setLocked(): void { - this.status = ELockStatus.locked; - } - - protected setExtended(): void { - this.status = ELockStatus.extended; - } - - protected extend(cb: ICallback): void { - if (!this.isLocked()) cb(new LockNotAcquiredError()); - else { - this.status = ELockStatus.extending; - this.redisClient.runScript( - ELuaScript.EXTEND_LOCK, - [this.lockKey], - [this.lockId, this.ttl], - (err, reply) => { - if (err) cb(err); - else { - if (this.status === ELockStatus.extending) { - if (reply !== 1) { - this.setUnlocked(); - cb(new LockExtendError()); - } else { - this.setExtended(); - cb(); - } - } else { - cb(new LockAbortError()); - } - } - }, - ); - } - } - - protected runAutoExtendTimer(): void { - const ms = Math.ceil(this.ttl / 2); - this.autoExtendTimer = setTimeout( - () => - this.extend((err) => { - if (err) { - if (!(err instanceof LockAbortError)) this.emit('error', err); - } else this.runAutoExtendTimer(); - }), - ms, - ); - } - - acquireLock(cb: ICallback): void { - if (!this.isReleased()) cb(new LockNotReleasedError()); - else { - this.status = ELockStatus.locking; - const lock = () => { - if (this.status === ELockStatus.locking) { - this.redisClient.set( - this.lockKey, - this.lockId, - { - expire: { - mode: 'PX', - value: this.ttl, - }, - exists: 'NX', - }, - (err, reply) => { - if (err) cb(err); - else if (this.status === ELockStatus.locking) { - if (!reply) { - if (this.retryOnFail) - this.lockingTimer = setTimeout(lock, 1000); - else { - this.setUnlocked(); - cb(new LockAcquireError()); - } - } else { - this.setLocked(); - if (this.autoExtend) { - this.runAutoExtendTimer(); - } - cb(); - } - } else { - cb(new LockAbortError()); - } - }, - ); - } else { - cb(new LockAbortError()); - } - }; - lock(); - } - } - - extendLock(cb: ICallback): void { - if (this.autoExtend) cb(new LockMethodNotAllowedError()); - else this.extend(cb); - } - - releaseLock(cb: ICallback): void { - const status = this.status; - if (status === ELockStatus.unlocked) cb(); - else if (!this.isLocked()) cb(new LockNotAcquiredError()); - else { - this.resetTimers(); - this.status = ELockStatus.releasing; - this.redisClient.runScript( - ELuaScript.RELEASE_LOCK, - [this.lockKey], - [this.lockId], - (err) => { - if (err) cb(err); - else { - this.setUnlocked(); - cb(); - } - }, - ); - } - } - - acquireOrExtend(cb: ICallback): void { - if (this.autoExtend) cb(new LockMethodNotAllowedError()); - else { - const lock = () => { - this.acquireLock((err) => { - if (err) cb(err); - else cb(null, ELockStatus.locked); - }); - }; - if (this.isLocked()) - this.extend((err) => { - if (err) { - if (err instanceof LockExtendError) lock(); - else cb(err); - } else cb(null, ELockStatus.extended); - }); - else lock(); - } - } - - isLocked(): boolean { - return ( - this.status === ELockStatus.locked || this.status === ELockStatus.extended - ); - } - - isReleased(): boolean { - return this.status === ELockStatus.unlocked; - } - - getId(): string { - return this.lockId; - } -} diff --git a/src/lock/errors/index.ts b/src/locker/errors/index.ts similarity index 89% rename from src/lock/errors/index.ts rename to src/locker/errors/index.ts index b043e79..3169d12 100644 --- a/src/lock/errors/index.ts +++ b/src/locker/errors/index.ts @@ -13,4 +13,3 @@ export { LockAcquireError } from './lock-acquire.error'; export { LockExtendError } from './lock-extend.error'; export { LockMethodNotAllowedError } from './lock-method-not-allowed.error'; export { LockNotAcquiredError } from './lock-not-acquired.error'; -export { LockNotReleasedError } from './lock-not-released.error'; diff --git a/src/lock/errors/lock-abort.error.ts b/src/locker/errors/lock-abort.error.ts similarity index 100% rename from src/lock/errors/lock-abort.error.ts rename to src/locker/errors/lock-abort.error.ts diff --git a/src/lock/errors/lock-acquire.error.ts b/src/locker/errors/lock-acquire.error.ts similarity index 100% rename from src/lock/errors/lock-acquire.error.ts rename to src/locker/errors/lock-acquire.error.ts diff --git a/src/lock/errors/lock-extend.error.ts b/src/locker/errors/lock-extend.error.ts similarity index 100% rename from src/lock/errors/lock-extend.error.ts rename to src/locker/errors/lock-extend.error.ts diff --git a/src/lock/errors/lock-method-not-allowed.error.ts b/src/locker/errors/lock-method-not-allowed.error.ts similarity index 100% rename from src/lock/errors/lock-method-not-allowed.error.ts rename to src/locker/errors/lock-method-not-allowed.error.ts diff --git a/src/lock/errors/lock-not-acquired.error.ts b/src/locker/errors/lock-not-acquired.error.ts similarity index 100% rename from src/lock/errors/lock-not-acquired.error.ts rename to src/locker/errors/lock-not-acquired.error.ts diff --git a/src/lock/errors/lock.error.ts b/src/locker/errors/lock.error.ts similarity index 100% rename from src/lock/errors/lock.error.ts rename to src/locker/errors/lock.error.ts diff --git a/src/locker/locker.ts b/src/locker/locker.ts new file mode 100644 index 0000000..b4b8512 --- /dev/null +++ b/src/locker/locker.ts @@ -0,0 +1,174 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback, ILogger } from '../../types'; +import { RedisClient } from '../redis-client/redis-client'; +import { + LockAbortError, + LockAcquireError, + LockExtendError, + LockMethodNotAllowedError, + LockNotAcquiredError, +} from './errors'; +import { ELuaScript } from './redis-client/redis-client'; +import { Runnable } from '../runnable/runnable'; +import { Timer } from '../timer/timer'; + +export type TLockerEvent = { + 'locker.up': (id: string) => void; + 'locker.down': (id: string) => void; + 'locker.goingUp': (id: string) => void; + 'locker.goingDown': (id: string) => void; + 'locker.error': (error: Error) => void; +}; + +export class Locker extends Runnable { + protected readonly lockKey; + protected readonly retryOnFail; + protected readonly ttl; + protected readonly redisClient; + protected readonly autoExtendInterval; + protected readonly timer; + + constructor( + redisClient: RedisClient, + logger: ILogger, + lockKey: string, + ttl: number, + retryOnFail = false, + autoExtendInterval: number = 0, + ) { + super(logger); + this.lockKey = lockKey; + this.ttl = ttl; + this.retryOnFail = retryOnFail; + this.redisClient = redisClient; + this.autoExtendInterval = autoExtendInterval; + this.timer = new Timer(); + this.on('locker.error', (err) => this.handleError(err)); + } + + protected lock = (cb: ICallback) => { + this.redisClient.set( + this.lockKey, + this.id, + { + expire: { + mode: 'PX', + value: this.ttl, + }, + exists: 'NX', + }, + (err, reply) => { + if (err) cb(err); + else if (this.powerSwitch.isGoingUp()) { + if (!reply) { + if (this.retryOnFail) + this.timer.setTimeout(() => this.lock(cb), 1000); + else cb(new LockAcquireError()); + } else cb(); + } else { + cb(new LockAbortError()); + } + }, + ); + }; + + protected extend = (cb: ICallback) => { + if (!this.isRunning()) cb(new LockNotAcquiredError()); + else { + this.redisClient.runScript( + ELuaScript.EXTEND_LOCK, + [this.lockKey], + [this.id, this.ttl], + (err, reply) => { + if (err) cb(err); + else { + if (this.powerSwitch.isRunning()) { + if (reply !== 1) { + this.forceShutdown(() => cb(new LockExtendError())); + } else cb(); + } else cb(new LockAbortError()); + } + }, + ); + } + }; + + protected release = (cb: ICallback): void => { + this.redisClient.runScript( + ELuaScript.RELEASE_LOCK, + [this.lockKey], + [this.id], + (err) => cb(err), + ); + }; + + protected tearDownTicker = (cb: ICallback): void => { + this.timer.reset(); + cb(); + }; + + protected autoExtendLock(): void { + if (this.autoExtendInterval) { + this.timer.setTimeout( + () => + this.extend((err) => { + if (err) { + if (!(err instanceof LockAbortError)) + this.emit('locker.error', err); + } else this.autoExtendLock(); + }), + this.autoExtendInterval, + ); + } + } + + protected override goingUp(): ((cb: ICallback) => void)[] { + return super.goingUp().concat([this.lock]); + } + + protected override goingDown(): ((cb: ICallback) => void)[] { + return [this.tearDownTicker, this.release].concat(super.goingDown()); + } + + override run(cb: ICallback) { + super.run((err, reply) => { + if (err) { + if (err instanceof LockAcquireError) cb(null, false); + else cb(err); + } else { + if (reply) this.autoExtendLock(); + cb(null, Boolean(reply)); + } + }); + } + + acquireLock(cb: ICallback) { + this.run(cb); + } + + releaseLock(cb: ICallback) { + this.shutdown(cb); + } + + extendLock(cb: ICallback): void { + if (this.autoExtendInterval) cb(new LockMethodNotAllowedError()); + else if (!this.powerSwitch.isRunning()) cb(new LockNotAcquiredError()); + else this.extend(cb); + } + + isLocked(): boolean { + return this.powerSwitch.isRunning(); + } + + isReleased(): boolean { + return this.powerSwitch.isDown(); + } +} diff --git a/src/lock/redis-client/lua/extend-lock.lua b/src/locker/redis-client/lua/extend-lock.lua similarity index 100% rename from src/lock/redis-client/lua/extend-lock.lua rename to src/locker/redis-client/lua/extend-lock.lua diff --git a/src/lock/redis-client/lua/release-lock.lua b/src/locker/redis-client/lua/release-lock.lua similarity index 100% rename from src/lock/redis-client/lua/release-lock.lua rename to src/locker/redis-client/lua/release-lock.lua diff --git a/src/lock/redis-client/redis-client.ts b/src/locker/redis-client/redis-client.ts similarity index 100% rename from src/lock/redis-client/redis-client.ts rename to src/locker/redis-client/redis-client.ts diff --git a/src/redis-client/clients/ioredis-client-multi.ts b/src/redis-client/clients/ioredis-client-multi.ts index b1733c3..4d44d39 100644 --- a/src/redis-client/clients/ioredis-client-multi.ts +++ b/src/redis-client/clients/ioredis-client-multi.ts @@ -8,9 +8,8 @@ */ import { ICallback, IRedisTransaction } from '../../../types'; -import { RedisClientError } from '../errors'; +import { RedisClientError, WatchedKeysChangedError } from '../errors'; import { Pipeline, Redis } from 'ioredis'; -import { WatchedKeysChangedError } from '../errors'; export class IoredisClientMulti implements IRedisTransaction { protected multi: Pipeline; diff --git a/src/redis-client/clients/ioredis-client.ts b/src/redis-client/clients/ioredis-client.ts index 0677f0f..e631d19 100644 --- a/src/redis-client/clients/ioredis-client.ts +++ b/src/redis-client/clients/ioredis-client.ts @@ -154,16 +154,18 @@ export class IoredisClient extends RedisClient { this.client.psubscribe(pattern); } - punsubscribe(channel: string): void { - this.client.punsubscribe(channel); + punsubscribe(channel?: string): void { + if (channel) this.client.punsubscribe(channel); + else this.client.punsubscribe(); } subscribe(channel: string): void { this.client.subscribe(channel); } - unsubscribe(channel: string): void { - this.client.unsubscribe(channel); + unsubscribe(channel?: string): void { + if (channel) this.client.unsubscribe(channel); + else this.client.unsubscribe(); } zrangebyscore( diff --git a/src/redis-client/clients/node-redis-client.ts b/src/redis-client/clients/node-redis-client.ts index 709113d..6bc4a4c 100644 --- a/src/redis-client/clients/node-redis-client.ts +++ b/src/redis-client/clients/node-redis-client.ts @@ -152,7 +152,7 @@ export class NodeRedisClient extends RedisClient { }); } - punsubscribe(channel: string): void { + punsubscribe(channel?: string): void { this.client.pUnsubscribe(channel).catch(() => void 0); } @@ -162,7 +162,7 @@ export class NodeRedisClient extends RedisClient { }); } - unsubscribe(channel: string): void { + unsubscribe(channel?: string): void { this.client.unsubscribe(channel).catch(() => void 0); } diff --git a/src/redis-client/lua-script.ts b/src/redis-client/lua-script.ts index 6aaf4c5..4f153ef 100644 --- a/src/redis-client/lua-script.ts +++ b/src/redis-client/lua-script.ts @@ -7,7 +7,7 @@ * in the root directory of this source tree. */ -import { ICallback, IRedisClient } from '../../types'; +import type { ICallback, IRedisClient } from '../../types'; import { CallbackEmptyReplyError } from '../errors'; import { RedisClientError } from './errors'; import { async } from '../async/async'; diff --git a/src/redis-client/redis-client.ts b/src/redis-client/redis-client.ts index 60a67da..8b36bea 100644 --- a/src/redis-client/redis-client.ts +++ b/src/redis-client/redis-client.ts @@ -98,11 +98,11 @@ export abstract class RedisClient abstract psubscribe(pattern: string): void; - abstract punsubscribe(channel: string): void; + abstract punsubscribe(channel?: string): void; abstract subscribe(channel: string): void; - abstract unsubscribe(channel: string): void; + abstract unsubscribe(channel?: string): void; abstract zrangebyscore( key: string, diff --git a/src/runnable/runnable.ts b/src/runnable/runnable.ts new file mode 100644 index 0000000..130b8c6 --- /dev/null +++ b/src/runnable/runnable.ts @@ -0,0 +1,98 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { PowerSwitch } from '../power-switch/power-switch'; +import { ICallback, ILogger, TEventEmitterEvent } from '../../types'; +import { v4 as uuid } from 'uuid'; +import { async } from '../async/async'; +import { EventEmitter } from '../event'; + +export abstract class Runnable< + RunnableEvent extends TEventEmitterEvent, +> extends EventEmitter { + protected id; + protected powerSwitch; + protected logger; + protected forceShutdownOnError = true; + + protected constructor(logger: ILogger) { + super(); + this.id = uuid(); + this.powerSwitch = new PowerSwitch(); + this.logger = logger; + } + + protected goingUp(): ((cb: ICallback) => void)[] { + return []; + } + + protected goingDown(): ((cb: ICallback) => void)[] { + return []; + } + + protected up(cb: ICallback): void { + this.powerSwitch.commit(); + cb(null, true); + } + + protected down(cb: ICallback): void { + this.powerSwitch.commit(); + cb(null, true); + } + + protected forceShutdown(cb: ICallback): void { + if (this.powerSwitch.isGoingUp()) this.powerSwitch.rollback(); + if (this.powerSwitch.isRunning()) this.powerSwitch.goingDown(); + const tasks = this.goingDown(); + async.waterfall(tasks, () => { + if (this.powerSwitch.isGoingDown()) { + this.down(() => cb()); + } else cb(); + }); + } + + protected handleError(err: Error): void { + if (this.powerSwitch.isGoingUp() || this.powerSwitch.isRunning()) { + this.logger.error(err); + this.forceShutdown(() => void 0); + } + } + + isRunning() { + return this.powerSwitch.isRunning() || this.powerSwitch.goingUp(); + } + + run(cb: ICallback): void { + const r = this.powerSwitch.goingUp(); + if (r) { + const tasks = this.goingUp(); + async.waterfall(tasks, (err) => { + if (err) { + if (this.forceShutdownOnError) this.forceShutdown(() => cb(err)); + else cb(err); + } else this.up(cb); + }); + } else cb(null, r); + } + + shutdown(cb: ICallback): void { + const r = this.powerSwitch.goingDown(); + if (r) { + const tasks = this.goingDown(); + async.waterfall(tasks, () => { + // ignoring shutdown errors + this.down(cb); + }); + } else cb(null, r); + } + + getId(): string { + return this.id; + } +} diff --git a/src/ticker/ticker.ts b/src/ticker/ticker.ts deleted file mode 100644 index 06d2b78..0000000 --- a/src/ticker/ticker.ts +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { TFunction, TEvent } from '../../types'; -import { PowerSwitch } from '../power-switch/power-switch'; -import { TickerError } from './errors'; -import { PanicError } from '../errors'; -import { EventEmitter } from '../event'; - -export class Ticker extends EventEmitter { - protected powerManager = new PowerSwitch(); - protected onTickFn: TFunction; - protected onNextTickFn: TFunction | null = null; - protected time: number; - - protected timeout: NodeJS.Timeout | null = null; - protected interval: NodeJS.Timeout | null = null; - protected shutdownTimeout: NodeJS.Timeout | null = null; - protected aborted = false; - - constructor(onTickFn: TFunction = () => void 0, time = 1000) { - super(); - this.onTickFn = onTickFn; - this.time = time; - this.powerManager.goingUp(); - } - - protected shutdown(): void { - if (this.shutdownTimeout) { - clearTimeout(this.shutdownTimeout); - } - this.powerManager.commit(); - this.emit('down'); - } - - protected onTick(): void { - if (this.powerManager.isGoingDown()) { - this.shutdown(); - } else if (this.powerManager.isRunning()) { - const tickFn = this.onNextTickFn ?? this.onTickFn; - this.onNextTickFn = null; - tickFn(); - } else { - this.emit('error', new PanicError(`Unexpected call`)); - } - } - - abort(): void { - if (!this.aborted) { - this.aborted = true; - if (this.powerManager.isGoingDown()) this.shutdown(); - else this.quit(); - } - } - - quit(): void { - if (this.powerManager.isGoingUp()) { - this.powerManager.rollback(); - this.emit('down'); - } else if (this.aborted && this.powerManager.isDown()) { - this.emit('down'); - } else { - this.powerManager.goingDown(); - if (this.timeout) { - clearTimeout(this.timeout); - this.shutdown(); - } else if (this.interval) { - clearInterval(this.interval); - this.shutdown(); - } else if (this.aborted) { - this.shutdown(); - } else { - // waiting 1 min for nextTick() - this.shutdownTimeout = setTimeout(() => { - if (this.powerManager.isGoingDown()) { - this.shutdown(); - } - }, 60000); - } - } - } - - isTicking(): boolean { - return !!(this.timeout || this.interval); - } - - nextTick(): void { - if (this.isTicking()) { - this.emit('error', new TickerError('A timer is already running')); - } else { - if (this.powerManager.isGoingDown()) { - this.shutdown(); - } else { - if (this.powerManager.isGoingUp()) { - this.powerManager.commit(); - } - if (this.powerManager.isRunning()) { - this.timeout = setTimeout(() => { - this.timeout = null; - this.onTick(); - }, this.time); - } - } - } - } - - nextTickFn(fn: TFunction): void { - this.onNextTickFn = fn; - this.nextTick(); - } - - runTimer(): void { - if (this.isTicking()) { - this.emit('error', new TickerError('A timer is already running')); - } else { - if (this.powerManager.isGoingUp()) { - this.powerManager.commit(); - } - if (this.powerManager.isRunning()) { - this.interval = setInterval(() => this.onTick(), this.time); - } - } - } -} diff --git a/src/ticker/errors/index.ts b/src/timer/errors/index.ts similarity index 82% rename from src/ticker/errors/index.ts rename to src/timer/errors/index.ts index 3a8eab1..0954fec 100644 --- a/src/ticker/errors/index.ts +++ b/src/timer/errors/index.ts @@ -7,4 +7,4 @@ * in the root directory of this source tree. */ -export { TickerError } from './ticker.error'; +export { TimerError } from './timer.error'; diff --git a/src/ticker/errors/ticker.error.ts b/src/timer/errors/timer.error.ts similarity index 84% rename from src/ticker/errors/ticker.error.ts rename to src/timer/errors/timer.error.ts index 3a3b869..57638d3 100644 --- a/src/ticker/errors/ticker.error.ts +++ b/src/timer/errors/timer.error.ts @@ -9,4 +9,4 @@ import { RedisSMQError } from '../../errors'; -export class TickerError extends RedisSMQError {} +export class TimerError extends RedisSMQError {} diff --git a/src/timer/timer.ts b/src/timer/timer.ts new file mode 100644 index 0000000..9599892 --- /dev/null +++ b/src/timer/timer.ts @@ -0,0 +1,60 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { EventEmitter } from '../event'; +import { TTimer, TTimerEvent } from '../../types/timer'; +import { TimerError } from './errors'; +import { TFunction } from '../../types'; + +export class Timer extends EventEmitter { + protected timer: TTimer | null = null; + + protected onTick = () => { + if (!this.timer) + this.emit('error', new TimerError('Expected a non-empty timer property')); + else { + const { fn, periodic } = this.timer; + if (!periodic) this.timer = null; + fn(); + } + }; + + setTimeout(fn: TFunction, timeout: number): boolean { + if (this.timer) { + return false; + } + this.timer = { + timer: setTimeout(() => this.onTick(), timeout), + periodic: false, + fn, + }; + return true; + } + + setInterval(fn: TFunction, interval = 1000): boolean { + if (this.timer) { + return false; + } + this.timer = { + timer: setInterval(() => this.onTick(), interval), + periodic: true, + fn, + }; + return true; + } + + reset() { + if (this.timer) { + const { timer, periodic } = this.timer; + if (periodic) clearInterval(timer); + else clearTimeout(timer); + this.timer = null; + } + } +} diff --git a/src/worker/errors/index.ts b/src/worker/errors/index.ts index fb6fc92..e8edb8d 100644 --- a/src/worker/errors/index.ts +++ b/src/worker/errors/index.ts @@ -8,3 +8,7 @@ */ export { WorkerError } from './worker-error'; +export { WorkerThreadError } from './worker-thread.error'; +export { WorkerPayloadRequiredError } from './worker-payload-required.error'; +export { WorkerAlreadyRunningError } from './worker-already-running.error'; +export { WorkerAlreadyDownError } from './worker-already-down.error'; diff --git a/src/worker/errors/worker-already-down.error.ts b/src/worker/errors/worker-already-down.error.ts new file mode 100644 index 0000000..9593e5d --- /dev/null +++ b/src/worker/errors/worker-already-down.error.ts @@ -0,0 +1,16 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { WorkerError } from './worker-error'; + +export class WorkerAlreadyDownError extends WorkerError { + constructor() { + super(`Worker is going/already down`); + } +} diff --git a/src/worker/errors/worker-already-running.error.ts b/src/worker/errors/worker-already-running.error.ts new file mode 100644 index 0000000..cc8b3c4 --- /dev/null +++ b/src/worker/errors/worker-already-running.error.ts @@ -0,0 +1,16 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { WorkerError } from './worker-error'; + +export class WorkerAlreadyRunningError extends WorkerError { + constructor() { + super(`Worker is going/already up`); + } +} diff --git a/src/worker/errors/worker-error.ts b/src/worker/errors/worker-error.ts index 00d6f2b..e5ccd6d 100644 --- a/src/worker/errors/worker-error.ts +++ b/src/worker/errors/worker-error.ts @@ -7,6 +7,6 @@ * in the root directory of this source tree. */ -import { PanicError } from '../../errors'; +import { RedisSMQError } from '../../errors'; -export class WorkerError extends PanicError {} +export class WorkerError extends RedisSMQError {} diff --git a/src/worker/errors/worker-payload-required.error.ts b/src/worker/errors/worker-payload-required.error.ts new file mode 100644 index 0000000..d1b478b --- /dev/null +++ b/src/worker/errors/worker-payload-required.error.ts @@ -0,0 +1,16 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { WorkerError } from './worker-error'; + +export class WorkerPayloadRequiredError extends WorkerError { + constructor() { + super(`Worker payload is required`); + } +} diff --git a/src/worker/errors/worker-thread.error.ts b/src/worker/errors/worker-thread.error.ts new file mode 100644 index 0000000..16799f6 --- /dev/null +++ b/src/worker/errors/worker-thread.error.ts @@ -0,0 +1,25 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { WorkerError } from './worker-error'; +import { + EWorkerThreadExecutionCode, + EWorkerThreadExitCode, + TWorkerThreadMessage, +} from '../../../types/worker'; + +export class WorkerThreadError extends WorkerError { + constructor(msg: TWorkerThreadMessage) { + const { code, error } = msg; + const messageStr = `Error code: ${ + EWorkerThreadExitCode[code] ?? EWorkerThreadExecutionCode[code] + }.${error ? ` Cause: ${error.name}(${error.message})` : ''}`; + super(messageStr); + } +} diff --git a/src/worker/worker-callable.ts b/src/worker/worker-callable.ts new file mode 100644 index 0000000..2c41feb --- /dev/null +++ b/src/worker/worker-callable.ts @@ -0,0 +1,29 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { EWorkerType, IWorkerCallable } from '../../types/worker'; +import { Worker } from './worker'; +import { ICallback } from '../../types'; +import { WorkerPayloadRequiredError } from './errors'; + +export class WorkerCallable + extends Worker + implements IWorkerCallable +{ + protected readonly type: EWorkerType = EWorkerType.CALLABLE; + + call(payload: Payload, cb: ICallback) { + if (payload === null || payload === undefined) { + cb(new WorkerPayloadRequiredError()); + } else { + // @ts-expect-error reply data type is known only at runtime + this.exec(payload, cb); + } + } +} diff --git a/src/worker/worker-resource-group.ts b/src/worker/worker-resource-group.ts new file mode 100644 index 0000000..c810430 --- /dev/null +++ b/src/worker/worker-resource-group.ts @@ -0,0 +1,150 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback, ILogger } from '../../types'; +import { PowerSwitch } from '../power-switch/power-switch'; +import { Locker } from '../locker/locker'; +import { RedisClient } from '../redis-client/redis-client'; +import { async } from '../async/async'; +import { WorkerRunnable } from './worker-runnable'; +import { Runnable } from '../runnable/runnable'; +import { readdir } from 'fs'; +import path from 'path'; + +export type TWorkerResourceGroupEvent = { + 'workerResourceGroup.error': ( + err: Error, + workerResourceGroupId: string, + ) => void; +}; + +export class WorkerResourceGroup extends Runnable { + protected readonly powerManager: PowerSwitch; + protected readonly locker: Locker; + protected readonly redisClient: RedisClient; + protected workers: { instance: WorkerRunnable; payload: unknown }[] = + []; + + constructor( + redisClient: RedisClient, + logger: ILogger, + resourceGroupId: string, + ) { + super(logger); + this.powerManager = new PowerSwitch(); + this.redisClient = redisClient; + this.logger = logger; + + // Locker + this.locker = new Locker( + redisClient, + logger, + resourceGroupId, + 60000, + true, + 15000, + ); + this.locker.on('locker.error', (err) => { + this.emit('workerResourceGroup.error', err, this.id); + }); + this.on('workerResourceGroup.error', (err) => { + this.handleError(err); + }); + } + + protected runWorkers = (): void => { + async.waterfall( + [ + (cb: ICallback) => { + this.locker.acquireLock((err) => { + if (err) cb(err); + else { + this.logger.info( + `Workers are exclusively running from this instance (Lock ID ${this.locker.getId()}).`, + ); + cb(); + } + }); + }, + (cb: ICallback) => { + async.each( + this.workers, + (worker, _, done) => { + const { instance, payload } = worker; + instance.run(payload, done); + }, + cb, + ); + }, + ], + (err) => { + if (err) this.emit('workerResourceGroup.error', err, this.id); + }, + ); + }; + + protected shutDownWorkers = (cb: ICallback): void => { + async.each( + this.workers, + (worker, _, done) => { + worker.instance.quit(() => done()); + }, + () => { + this.workers = []; + cb(); + }, + ); + }; + + protected releaseLock = (cb: ICallback) => { + this.locker.releaseLock((err) => cb(err)); + }; + + addWorker = (filename: string, payload: unknown): void => { + const worker = new WorkerRunnable(filename); + worker.on('worker.error', (err) => { + this.emit('workerResourceGroup.error', err, this.id); + }); + this.workers.push({ instance: worker, payload }); + }; + + loadFromDir = ( + workersDir: string, + payload: unknown, + cb: ICallback, + ): void => { + readdir(workersDir, (err, files) => { + if (err) cb(err); + else { + async.each( + files ?? [], + (file, _, done) => { + if (file.endsWith('.worker.js')) { + const filepath = path.resolve(workersDir, file); + this.addWorker(filepath, payload); + done(); + } else done(); + }, + (err) => cb(err), + ); + } + }); + }; + + protected override goingDown(): ((cb: ICallback) => void)[] { + return [this.shutDownWorkers, this.releaseLock].concat(super.goingDown()); + } + + protected override up(cb: ICallback): void { + super.up(() => { + this.runWorkers(); + cb(); + }); + } +} diff --git a/src/worker/worker-runnable.ts b/src/worker/worker-runnable.ts new file mode 100644 index 0000000..fadb36a --- /dev/null +++ b/src/worker/worker-runnable.ts @@ -0,0 +1,53 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { EWorkerType, IWorkerRunnable } from '../../types/worker'; +import { Worker } from './worker'; +import { ICallback } from '../../types'; +import { PowerSwitch } from '../power-switch/power-switch'; +import { WorkerAlreadyDownError, WorkerAlreadyRunningError } from './errors'; + +export class WorkerRunnable + extends Worker + implements IWorkerRunnable +{ + protected readonly type: EWorkerType = EWorkerType.RUNNABLE; + protected readonly powerSwitch; + + constructor(workerFilename: string) { + super(workerFilename); + this.powerSwitch = new PowerSwitch(); + } + + run(initialPayload: Payload, cb: ICallback) { + const r = this.powerSwitch.goingUp(); + if (r) { + this.exec(initialPayload, (err) => { + if (err) { + this.powerSwitch.rollback(); + cb(err); + } else { + this.powerSwitch.commit(); + this.registerEvents(this); + cb(); + } + }); + } else cb(new WorkerAlreadyRunningError()); + } + + override quit(cb: ICallback) { + const r = this.powerSwitch.goingDown(); + if (r) { + super.quit(() => { + this.powerSwitch.commit(); + cb(); + }); + } else cb(new WorkerAlreadyDownError()); + } +} diff --git a/src/worker/worker-runner/worker-pool.ts b/src/worker/worker-runner/worker-pool.ts deleted file mode 100644 index 2bf19f3..0000000 --- a/src/worker/worker-runner/worker-pool.ts +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { Worker } from '../worker'; -import { ICallback } from '../../../types'; -import { async } from '../../async/async'; - -export class WorkerPool { - private pool: Worker[] = []; - - work = (cb: ICallback): void => { - if (this.pool.length) { - const tasks = this.pool.map( - (worker) => (cb: ICallback) => worker.work(cb), - ); - async.waterfall(tasks, cb); - } else cb(); - }; - - add = (worker: Worker): number => { - this.pool.push(worker); - return this.pool.length; - }; - - clear = (cb: ICallback): void => { - async.each( - this.pool, - (worker, _, done) => { - worker.quit(done); - }, - () => { - this.pool = []; - cb(); - }, - ); - }; -} diff --git a/src/worker/worker-runner/worker-runner.ts b/src/worker/worker-runner/worker-runner.ts deleted file mode 100644 index 3f93dfe..0000000 --- a/src/worker/worker-runner/worker-runner.ts +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { ICallback, ILogger, TEvent } from '../../../types'; -import { PowerSwitch } from '../../power-switch/power-switch'; -import { Ticker } from '../../ticker/ticker'; -import { ELockStatus, Lock } from '../../lock/lock'; -import { RedisClient } from '../../redis-client/redis-client'; -import { WorkerPool } from './worker-pool'; -import { Worker } from '../worker'; -import { async } from '../../async/async'; -import { LockAcquireError } from '../../lock/errors'; -import { EventEmitter } from '../../event'; - -export class WorkerRunner extends EventEmitter { - private readonly powerManager: PowerSwitch; - private readonly ticker: Ticker; - private readonly lock: Lock; - private readonly redisClient: RedisClient; - private readonly logger: ILogger; - private readonly workerPool: WorkerPool; - - constructor( - redisClient: RedisClient, - keyLock: string, - workerPool: WorkerPool, - logger: ILogger, - ) { - super(); - this.powerManager = new PowerSwitch(); - this.redisClient = redisClient; - this.logger = logger; - this.lock = new Lock(redisClient, keyLock, 60000); - this.ticker = new Ticker(this.onTick); - this.workerPool = workerPool; - } - - private onTick = (): void => { - async.waterfall( - [ - (cb: ICallback) => { - this.lock.acquireOrExtend((err, status) => { - if (status === ELockStatus.locked) { - this.logger.info( - `Workers are exclusively running from this instance (Lock ID ${this.lock.getId()}).`, - ); - } - cb(err); - }); - }, - (cb: ICallback) => { - this.workerPool.work(cb); - }, - ], - (err) => { - if (!err || err instanceof LockAcquireError) this.ticker.nextTick(); - else this.emit('error', err); - }, - ); - }; - - private clearWorkerPool = (cb: ICallback): void => { - this.workerPool.clear(cb); - }; - - private stopTicker = (cb: ICallback) => { - this.ticker.once('down', cb); - this.ticker.quit(); - }; - - private releaseLock = (cb: ICallback) => { - this.lock.releaseLock(cb); - }; - - addWorker(instance: Worker): void { - this.workerPool.add(instance); - } - - run = (cb: ICallback): void => { - this.ticker.nextTick(); - this.emit('up'); - cb(); - }; - - quit = (cb: ICallback): void => { - async.waterfall( - [this.stopTicker, this.clearWorkerPool, this.releaseLock], - () => { - this.emit('down'); - cb(); - }, - ); - }; -} diff --git a/src/worker/worker-thread.ts b/src/worker/worker-thread.ts new file mode 100644 index 0000000..29292cd --- /dev/null +++ b/src/worker/worker-thread.ts @@ -0,0 +1,139 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { + isMainThread, + MessagePort, + parentPort, + // type-coverage:ignore-next-line + workerData, +} from 'worker_threads'; +import { + EWorkerThreadExecutionCode, + EWorkerThreadExitCode, + EWorkerType, + IWorkerData, + TWorkerFn, + TWorkerThreadMessage, + TWorkerThreadMessageCode, +} from '../../types/worker'; +import path from 'path'; +import { statSync } from 'fs'; + +function importWorkerFn( + filename: string, + cb: (worker: TWorkerFn) => void, +): void { + import(filename) + .then((importedModule: { default?: TWorkerFn } | TWorkerFn) => { + const fn = + typeof importedModule !== 'function' && importedModule.default + ? importedModule.default + : importedModule; + if (typeof fn !== 'function') { + exit(EWorkerThreadExitCode.INVALID_WORKER_TYPE); + } else cb(fn); + }) + .catch((err: unknown) => { + console.error(err); + exit(EWorkerThreadExitCode.FILE_IMPORT_ERROR); + }); +} + +function formatErrorMessage( + code: TWorkerThreadMessageCode, + err?: unknown, +): TWorkerThreadMessage { + const error = + err && err instanceof Error + ? { name: err.name, message: err.message } + : null; + return { + code, + error, + }; +} + +function formatOKMessage(data: unknown): TWorkerThreadMessage { + return { + code: EWorkerThreadExecutionCode.OK, + data, + }; +} + +function postMessage( + messagePort: MessagePort, + code: TWorkerThreadMessageCode, + err?: unknown, + data?: unknown, +) { + const msg = + code === EWorkerThreadExecutionCode.OK + ? formatOKMessage(data) + : formatErrorMessage(code, err); + messagePort.postMessage(msg); +} + +function exit(code: TWorkerThreadMessageCode, err?: unknown) { + parentPort && postMessage(parentPort, code, err); + process.exit(code); +} + +if (!isMainThread && parentPort) { + // type-coverage:ignore-next-line + if (!workerData) { + exit(EWorkerThreadExitCode.WORKER_DATA_REQUIRED); + } else { + const messagePort: MessagePort = parentPort; + + const { type, filename }: IWorkerData = workerData; + + if (!['.js', '.cjs'].includes(path.extname(filename))) { + exit(EWorkerThreadExitCode.FILE_EXTENSION_ERROR); + } + + try { + statSync(filename); + } catch (e: unknown) { + exit(EWorkerThreadExitCode.FILE_READ_ERROR, e); + } + + importWorkerFn(filename, (workerFn) => { + const callback = (err?: Error | null, reply?: unknown) => { + if (err) { + postMessage( + messagePort, + EWorkerThreadExecutionCode.PROCESSING_ERROR, + err, + ); + } else { + postMessage(messagePort, EWorkerThreadExecutionCode.OK, null, reply); + } + }; + + const onMessage = (...args: unknown[]) => { + try { + workerFn(...args, callback); + } catch (err: unknown) { + postMessage( + messagePort, + EWorkerThreadExecutionCode.PROCESSING_CAUGHT_ERROR, + err, + ); + } + }; + if (type === EWorkerType.CALLABLE) messagePort.on('message', onMessage); + else messagePort.once('message', onMessage); + }); + } + + process.on('uncaughtException', (err) => { + exit(EWorkerThreadExitCode.UNCAUGHT_EXCEPTION, err); + }); +} diff --git a/src/worker/worker.ts b/src/worker/worker.ts index 00a65d7..def390b 100644 --- a/src/worker/worker.ts +++ b/src/worker/worker.ts @@ -7,71 +7,104 @@ * in the root directory of this source tree. */ -import { Ticker } from '../ticker/ticker'; -import { ICallback, TEvent } from '../../types'; -import { PowerSwitch } from '../power-switch/power-switch'; -import { WorkerError } from './errors'; +import { ICallback } from '../../types'; +import { Worker as WorkerThread } from 'worker_threads'; +import path from 'path'; +import { + EWorkerThreadExecutionCode, + EWorkerThreadExitCode, + EWorkerType, + TWorkerThreadMessage, +} from '../../types/worker'; +import { WorkerThreadError } from './errors'; import { EventEmitter } from '../event'; -export abstract class Worker extends EventEmitter { - private readonly ticker: Ticker | null = null; - private readonly powerManager: PowerSwitch | null = null; - private readonly managed: boolean; +export type TWorkerEvent = { + 'worker.error': (err: Error) => void; + 'worker.data': (payload: unknown) => void; +}; - constructor(managed: boolean, timeout = 1000) { +export abstract class Worker extends EventEmitter { + protected abstract readonly type: EWorkerType; + protected readonly workerFilename; + protected workerThread: WorkerThread | null = null; + + constructor(workerFilename: string) { super(); - this.managed = managed; - if (!managed) { - this.ticker = new Ticker(this.onTick, timeout); - this.powerManager = new PowerSwitch(); - } + this.workerFilename = workerFilename; } - private getTicker = (): Ticker => { - if (!this.ticker) { - throw new WorkerError(`Expected an instance of Ticker`); + protected getWorkerThread(): WorkerThread { + if (!this.workerThread) { + this.workerThread = new WorkerThread( + path.resolve(__dirname, './worker-thread.js'), + { + workerData: { filename: this.workerFilename, type: this.type }, + }, + ); + this.workerThread.on('messageerror', (err) => { + console.error(err); + }); + this.workerThread.on('error', (err) => { + console.error(err); + }); + this.workerThread.on('exit', () => { + this.workerThread = null; + }); } - return this.ticker; - }; + return this.workerThread; + } - private getPowerManager(): PowerSwitch { - if (!this.powerManager) { - throw new WorkerError('Expected an instance of PowerSwitch'); - } - return this.powerManager; + protected registerEvents(cb: ICallback | Worker): void { + const worker = this.getWorkerThread(); + const cleanUp = () => { + worker + .removeListener('message', onMessage) + .removeListener('exit', onExit); + }; + const callback: ICallback = (err, data) => { + if (err) { + if (cb instanceof Worker) { + this.emit('worker.error', err); + } else cb(err); + } else { + if (cb instanceof Worker) this.emit('worker.data', data); + else cb(null, data); + } + }; + const onMessage = (msg: TWorkerThreadMessage) => { + cleanUp(); + if (msg.code !== EWorkerThreadExecutionCode.OK) { + console.error(`WorkerThreadError`, msg); + callback(new WorkerThreadError(msg)); + } else callback(null, msg.data); + }; + const onExit = () => { + cleanUp(); + const msg = { + code: EWorkerThreadExitCode.TERMINATED, + error: null, + }; + console.error('WorkerThreadError', msg); + callback(new WorkerThreadError(msg)); + }; + worker.once('message', onMessage); + worker.once('exit', onExit); } - private onTick = (): void => { - this.work((err) => { - if (err) this.emit('error', err); - else this.getTicker().nextTick(); - }); - }; + protected exec(payload: unknown, cb: ICallback): void { + this.registerEvents(cb); + if (!(payload === null || payload === undefined)) + this.getWorkerThread().postMessage(payload); + } - run = (cb: ICallback): void => { - if (this.managed) cb(new WorkerError('You can not run a managed worker')); - else { - const powerManager = this.getPowerManager(); - powerManager.goingUp(); - const ticker = this.getTicker(); - ticker.nextTick(); - powerManager.commit(); + quit(cb: ICallback) { + const callback = () => { + this.workerThread = null; cb(); - } - }; - - quit = (cb: ICallback): void => { - if (!this.managed) { - const powerManager = this.getPowerManager(); - powerManager.goingDown(); - const ticker = this.getTicker(); - ticker.on('down', () => { - powerManager.commit(); - cb(); - }); - ticker.quit(); + }; + if (this.workerThread) { + this.workerThread.terminate().then(callback).catch(callback); } else cb(); - }; - - abstract work(cb: ICallback): void; + } } diff --git a/types/event/event-bus.ts b/types/event/event-bus.ts new file mode 100644 index 0000000..b39e8a2 --- /dev/null +++ b/types/event/event-bus.ts @@ -0,0 +1,16 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback } from '../common'; +import { IEventEmitter, TEventEmitterEvent } from './event-emitter'; + +export interface IEventBus + extends IEventEmitter { + disconnect(cb: ICallback): void; +} diff --git a/types/event/event-emitter.ts b/types/event/event-emitter.ts new file mode 100644 index 0000000..51f0abf --- /dev/null +++ b/types/event/event-emitter.ts @@ -0,0 +1,22 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type TEventEmitterEvent = Record void>; // type-coverage:ignore-line + +export interface IEventEmitter { + on(event: E, listener: Events[E]): this; + once(event: E, listener: Events[E]): this; + emit( + event: E, + ...args: Parameters + ): boolean; + removeAllListeners(event?: E): this; + removeListener(event: E, listener: Events[E]): this; +} diff --git a/types/event/index.ts b/types/event/index.ts index 43d10c2..f031c97 100644 --- a/types/event/index.ts +++ b/types/event/index.ts @@ -7,4 +7,5 @@ * in the root directory of this source tree. */ -export * from './event'; +export * from './event-emitter'; +export * from './event-bus'; diff --git a/types/index.ts b/types/index.ts index f236bbb..84c0254 100644 --- a/types/index.ts +++ b/types/index.ts @@ -7,7 +7,7 @@ * in the root directory of this source tree. */ -export * from './common'; -export * from './redis'; -export * from './logger'; -export * from './event'; +export * from './common.js'; +export * from './redis/index.js'; +export * from './logger/index.js'; +export * from './event/index.js'; diff --git a/types/event/event.ts b/types/timer/index.ts similarity index 61% rename from types/event/event.ts rename to types/timer/index.ts index 64c6649..69de9f7 100644 --- a/types/event/event.ts +++ b/types/timer/index.ts @@ -7,11 +7,14 @@ * in the root directory of this source tree. */ -export type TEvent = { +import { TFunction } from '../common'; + +export type TTimerEvent = { error: (err: Error) => void; - up: () => void; - down: () => void; - goingUp: () => void; - goingDown: () => void; - tick: () => void; +}; + +export type TTimer = { + timer: NodeJS.Timeout; + periodic: boolean; + fn: TFunction; }; diff --git a/types/worker/index.ts b/types/worker/index.ts new file mode 100644 index 0000000..1fed6d5 --- /dev/null +++ b/types/worker/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './worker'; diff --git a/types/worker/worker.ts b/types/worker/worker.ts new file mode 100644 index 0000000..93f25d3 --- /dev/null +++ b/types/worker/worker.ts @@ -0,0 +1,62 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback } from '../common'; + +export enum EWorkerType { + CALLABLE, + RUNNABLE, +} + +// eslint-disable-next-line +export type TWorkerFn = (...args: [...any[], ICallback]) => void; // type-coverage:ignore-line + +export interface IWorkerRunnable { + run(initialPayload: Payload, cb: ICallback): void; +} + +export interface IWorkerCallable { + call(payload: Payload, cb: ICallback): void; +} + +export interface IWorkerData { + type: EWorkerType; + filename: string; +} + +export enum EWorkerThreadExitCode { + WORKER_DATA_REQUIRED = 100, + INVALID_WORKER_TYPE, + FILE_IMPORT_ERROR, + UNCAUGHT_EXCEPTION, + FILE_EXTENSION_ERROR, + FILE_READ_ERROR, + TERMINATED, +} + +export enum EWorkerThreadExecutionCode { + OK = 200, + PROCESSING_ERROR, + PROCESSING_CAUGHT_ERROR, +} + +export type TWorkerThreadMessageCode = + | EWorkerThreadExitCode + | EWorkerThreadExecutionCode; + +export type TWorkerThreadError = { + name: string; + message: string; +}; + +export type TWorkerThreadMessage = { + code: TWorkerThreadMessageCode; + data?: unknown; + error?: TWorkerThreadError | null; +};