diff --git a/index.ts b/index.ts index 98f4587..4e67764 100755 --- a/index.ts +++ b/index.ts @@ -7,22 +7,16 @@ * in the root directory of this source tree. */ -export * from './types/index.js'; +export * from './src/async/index.js'; +export * from './src/common/index.js'; +export * from './src/env/index.js'; export * from './src/errors/index.js'; -export * from './src/logger/errors/index.js'; -export * from './src/timer/errors/index.js'; -export * from './src/locker/errors/index.js'; -export * from './src/redis-client/errors/index.js'; -export * from './src/worker/errors/index.js'; -export { Locker } from './src/locker/locker.js'; -export { PowerSwitch } from './src/power-switch/power-switch.js'; -export { RedisClient } from './src/redis-client/redis-client.js'; -export { Timer } from './src/timer/timer.js'; -export { Runnable } from './src/runnable/runnable.js'; -export { WorkerCallable } from './src/worker/worker-callable.js'; -export { WorkerRunnable } from './src/worker/worker-runnable.js'; -export { WorkerResourceGroup } from './src/worker/worker-resource-group.js'; -export { logger } from './src/logger/logger.js'; -export { async } from './src/async/async.js'; -export { redis } from './src/redis-client/index.js'; export * from './src/event/index.js'; +export * from './src/event-bus/index.js'; +export * from './src/locker/index.js'; +export * from './src/logger/index.js'; +export * from './src/power-switch/index.js'; +export * from './src/redis-client/index.js'; +export * from './src/runnable/index.js'; +export * from './src/timer/index.js'; +export * from './src/worker/index.js'; diff --git a/scripts/build.sh b/scripts/build.sh index 24f4d5e..92c22e6 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -17,8 +17,8 @@ rm -rf dist # esm tsc -p ./tsconfig.json -cp -r src/redis-client/lua dist/esm/src/redis-client/ -cp -r src/locker/redis-client dist/esm/src/locker/ +cp -r src/redis-client/lua-scripts/scripts dist/esm/src/redis-client/lua-scripts/ +cp -r src/locker/redis-client/lua-scripts/scripts dist/esm/src/locker/redis-client/lua-scripts/ # cjs tsc -p ./tsconfig.cjs.json @@ -27,8 +27,8 @@ cat >dist/cjs/package.json <( collection: T[], diff --git a/src/async/index.ts b/src/async/index.ts new file mode 100644 index 0000000..9b6806f --- /dev/null +++ b/src/async/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './async.js'; diff --git a/src/common/index.ts b/src/common/index.ts new file mode 100644 index 0000000..560170f --- /dev/null +++ b/src/common/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './types/index.js'; diff --git a/types/common.ts b/src/common/types/index.ts similarity index 100% rename from types/common.ts rename to src/common/types/index.ts diff --git a/src/env/index.ts b/src/env/index.ts new file mode 100644 index 0000000..d90900d --- /dev/null +++ b/src/env/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './environment.js'; diff --git a/src/errors/abort.error.ts b/src/errors/abort.error.ts new file mode 100644 index 0000000..e1e823a --- /dev/null +++ b/src/errors/abort.error.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { RedisSMQError } from './redis-smq.error.js'; + +export class AbortError extends RedisSMQError {} diff --git a/src/errors/index.ts b/src/errors/index.ts index 5ee8a5c..b06c6b3 100644 --- a/src/errors/index.ts +++ b/src/errors/index.ts @@ -11,3 +11,4 @@ export { CallbackEmptyReplyError } from './callback-empty-reply.error.js'; export { CallbackInvalidReplyError } from './callback-invalid-reply.error.js'; export { PanicError } from './panic.error.js'; export { RedisSMQError } from './redis-smq.error.js'; +export { AbortError } from './abort.error.js'; diff --git a/src/event/errors/event-bus-connection.error.ts b/src/event-bus/errors/event-bus-not-connected.error.ts similarity index 80% rename from src/event/errors/event-bus-connection.error.ts rename to src/event-bus/errors/event-bus-not-connected.error.ts index 7717780..418911a 100644 --- a/src/event/errors/event-bus-connection.error.ts +++ b/src/event-bus/errors/event-bus-not-connected.error.ts @@ -9,4 +9,4 @@ import { EventBusError } from './event-bus.error.js'; -export class EventBusConnectionError extends EventBusError {} +export class EventBusNotConnectedError extends EventBusError {} diff --git a/src/event/errors/event-bus.error.ts b/src/event-bus/errors/event-bus.error.ts similarity index 100% rename from src/event/errors/event-bus.error.ts rename to src/event-bus/errors/event-bus.error.ts diff --git a/src/event/errors/index.ts b/src/event-bus/errors/index.ts similarity index 77% rename from src/event/errors/index.ts rename to src/event-bus/errors/index.ts index 4efa97a..10d4f7e 100644 --- a/src/event/errors/index.ts +++ b/src/event-bus/errors/index.ts @@ -8,4 +8,4 @@ */ export { EventBusError } from './event-bus.error.js'; -export { EventBusConnectionError } from './event-bus-connection.error.js'; +export { EventBusNotConnectedError } from './event-bus-not-connected.error.js'; diff --git a/src/event-bus/event-bus-redis.ts b/src/event-bus/event-bus-redis.ts new file mode 100644 index 0000000..d4211c2 --- /dev/null +++ b/src/event-bus/event-bus-redis.ts @@ -0,0 +1,175 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { async } from '../async/index.js'; +import { ICallback } from '../common/index.js'; +import { + createRedisClient, + IRedisConfig, + IRedisClient, +} from '../redis-client/index.js'; +import { EventEmitter, IEventBus, TEventBusEvent } from '../event/index.js'; +import { EventBusNotConnectedError } from './errors/index.js'; + +export class EventBusRedis + extends EventEmitter + implements IEventBus +{ + protected connected = false; + protected pubClient; + protected subClient; + + protected constructor(pubClient: IRedisClient, subClient: IRedisClient) { + super(); + + // + this.pubClient = pubClient; + this.pubClient.on('error', (err: Error) => { + this.eventEmitter.emit('error', err); + }); + + // + this.subClient = subClient; + this.subClient.on('message', (channel: keyof Events, message: string) => { + this.eventEmitter.emit(String(channel), ...JSON.parse(message)); + }); + this.subClient.on('error', (err: Error) => { + this.eventEmitter.emit('error', err); + }); + + this.connected = true; + } + + override emit( + event: E, + ...args: Parameters + ): boolean { + if (!this.connected) { + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return false; + } + this.pubClient.publish(String(event), JSON.stringify(args), () => void 0); + return true; + } + + override on(event: E, listener: Events[E]): this { + if (event === 'error') { + super.on('error', listener); + return this; + } + if (!this.connected) { + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; + } + this.subClient.subscribe(String(event)); + super.on(event, listener); + return this; + } + + override once(event: E, listener: Events[E]): this { + if (event === 'error') { + super.once('error', listener); + return this; + } + if (!this.connected) { + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; + } + this.subClient.subscribe(String(event)); + super.once(event, listener); + return this; + } + + override removeAllListeners( + event?: Extract, + ): this { + if (event === 'error') { + super.removeAllListeners('error'); + return this; + } + if (!this.connected) { + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; + } + if (event) this.subClient.unsubscribe(String(event)); + else this.subClient.unsubscribe(); + super.removeAllListeners(event); + return this; + } + + override removeListener( + event: E, + listener: Events[E], + ): this { + if (event === 'error') { + super.removeListener('error', listener); + return this; + } + if (!this.connected) { + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; + } + super.removeListener(event, listener); + return this; + } + + shutDown(cb: ICallback) { + if (this.connected) { + async.waterfall( + [ + (cb: ICallback) => this.subClient.halt(() => cb()), + (cb: ICallback) => this.pubClient.halt(() => cb()), + ], + () => { + this.connected = false; + cb(); + }, + ); + } else cb(); + } + + static createInstance( + config: IRedisConfig, + cb: ICallback>, + ): void { + let pubClient: IRedisClient | null | undefined = null; + let subClient: IRedisClient | null | undefined = null; + async.waterfall( + [ + (cb: ICallback) => + createRedisClient(config, (err, client) => { + if (err) cb(err); + else { + pubClient = client; + cb(); + } + }), + (cb: ICallback) => + createRedisClient(config, (err, client) => { + if (err) cb(err); + else { + subClient = client; + cb(); + } + }), + ], + (err) => { + if (err) { + if (pubClient) pubClient.halt(() => void 0); + if (subClient) subClient.halt(() => void 0); + cb(err); + } else if (!pubClient || !subClient) cb(new Error()); + else { + const instance = new EventBusRedis(pubClient, subClient); + cb(null, instance); + } + }, + ); + } +} diff --git a/src/event/event-bus/event-bus.ts b/src/event-bus/event-bus.ts similarity index 51% rename from src/event/event-bus/event-bus.ts rename to src/event-bus/event-bus.ts index 48f337f..22fce15 100644 --- a/src/event/event-bus/event-bus.ts +++ b/src/event-bus/event-bus.ts @@ -7,19 +7,15 @@ * in the root directory of this source tree. */ -import { - ICallback, - IEventBus, - TEventEmitterEvent, -} from '../../../types/index.js'; -import { EventBusConnectionError } from '../errors/index.js'; -import { EventEmitter } from '../event-emitter.js'; +import { ICallback } from '../common/index.js'; +import { IEventBus, TEventBusEvent } from './types/index.js'; +import { EventBusNotConnectedError } from './errors/index.js'; +import { EventEmitter } from '../event/event-emitter.js'; -export class EventBus +export class EventBus extends EventEmitter implements IEventBus { - protected static instance: IEventBus | null = null; protected connected = false; protected constructor() { @@ -32,22 +28,33 @@ export class EventBus ...args: Parameters ): boolean { if (!this.connected) { - throw new EventBusConnectionError(); + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return false; } return super.emit(event, ...args); } override on(event: E, listener: Events[E]): this { + if (event === 'error') { + super.on('error', listener); + return this; + } if (!this.connected) { - throw new EventBusConnectionError(); + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; } super.on(event, listener); return this; } override once(event: E, listener: Events[E]): this { + if (event === 'error') { + super.once('error', listener); + return this; + } if (!this.connected) { - throw new EventBusConnectionError(); + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; } super.once(event, listener); return this; @@ -56,8 +63,13 @@ export class EventBus override removeAllListeners( event?: Extract, ): this { + if (event === 'error') { + super.removeAllListeners('error'); + return this; + } if (!this.connected) { - throw new EventBusConnectionError(); + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; } super.removeAllListeners(event); return this; @@ -67,27 +79,27 @@ export class EventBus event: E, listener: Events[E], ): this { + if (event === 'error') { + super.removeListener('error', listener); + return this; + } if (!this.connected) { - throw new EventBusConnectionError(); + this.eventEmitter.emit('error', new EventBusNotConnectedError()); + return this; } super.removeListener(event, listener); return this; } - disconnect(cb: ICallback) { - if (this.connected) { - this.connected = false; - if (this === EventBus.instance) EventBus.instance = null; - } + shutDown(cb: ICallback) { + if (this.connected) this.connected = false; cb(); } - static getInstance( + static createInstance( cb: ICallback>, ): void { - if (!EventBus.instance) { - EventBus.instance = new EventBus(); - } - cb(null, EventBus.instance); + const instance = new EventBus(); + cb(null, instance); } } diff --git a/src/event-bus/index.ts b/src/event-bus/index.ts new file mode 100644 index 0000000..587a1bf --- /dev/null +++ b/src/event-bus/index.ts @@ -0,0 +1,13 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './errors/index.js'; +export * from './types/index.js'; +export * from './event-bus.js'; +export * from './event-bus-redis.js'; diff --git a/src/event-bus/types/event-bus.ts b/src/event-bus/types/event-bus.ts new file mode 100644 index 0000000..2b11574 --- /dev/null +++ b/src/event-bus/types/event-bus.ts @@ -0,0 +1,23 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback } from '../../common/index.js'; +import { + IEventEmitter, + TEventEmitterEvent, +} from '../../event/types/event-emitter.js'; + +export type TEventBusEvent = TEventEmitterEvent & { + error: (err: Error) => void; +}; + +export interface IEventBus + extends IEventEmitter { + shutDown(cb: ICallback): void; +} diff --git a/types/event/index.ts b/src/event-bus/types/index.ts similarity index 87% rename from types/event/index.ts rename to src/event-bus/types/index.ts index 6d46f4e..50d7a8d 100644 --- a/types/event/index.ts +++ b/src/event-bus/types/index.ts @@ -7,5 +7,4 @@ * in the root directory of this source tree. */ -export * from './event-emitter.js'; export * from './event-bus.js'; diff --git a/src/event/event-bus-redis/event-bus-redis.ts b/src/event/event-bus-redis/event-bus-redis.ts deleted file mode 100644 index cdffca4..0000000 --- a/src/event/event-bus-redis/event-bus-redis.ts +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { - ICallback, - IEventBus, - IRedisConfig, - TEventEmitterEvent, -} from '../../../types/index.js'; -import { async } from '../../async/async.js'; -import { redis } from '../../redis-client/index.js'; -import { RedisClient } from '../../redis-client/redis-client.js'; -import { EventBusConnectionError } from '../errors/index.js'; -import { EventEmitter } from '../event-emitter.js'; - -export class EventBusRedis - extends EventEmitter - implements IEventBus -{ - protected static instance: IEventBus | null = null; - protected connected = false; - protected pubClient; - protected subClient; - - protected constructor(pubClient: RedisClient, subClient: RedisClient) { - super(); - this.pubClient = pubClient; - this.subClient = subClient; - this.subClient.on('message', (channel: keyof Events, message: string) => { - this.eventEmitter.emit(String(channel), ...JSON.parse(message)); - }); - this.connected = true; - } - - override emit( - event: E, - ...args: Parameters - ): boolean { - if (!this.connected) { - throw new EventBusConnectionError(); - } - this.pubClient.publish(String(event), JSON.stringify(args), () => void 0); - return true; - } - - override on(event: E, listener: Events[E]): this { - if (!this.connected) { - throw new EventBusConnectionError(); - } - this.subClient.subscribe(String(event)); - super.on(event, listener); - return this; - } - - override once(event: E, listener: Events[E]): this { - if (!this.connected) { - throw new EventBusConnectionError(); - } - this.subClient.subscribe(String(event)); - super.once(event, listener); - return this; - } - - override removeAllListeners( - event?: Extract, - ): this { - if (!this.connected) { - throw new EventBusConnectionError(); - } - if (event) this.subClient.unsubscribe(String(event)); - else this.subClient.unsubscribe(); - super.removeAllListeners(event); - return this; - } - - override removeListener( - event: E, - listener: Events[E], - ): this { - if (!this.connected) { - throw new EventBusConnectionError(); - } - super.removeListener(event, listener); - return this; - } - - disconnect(cb: ICallback) { - if (this.connected) { - async.waterfall( - [ - (cb: ICallback) => this.subClient.halt(() => cb()), - (cb: ICallback) => this.pubClient.halt(() => cb()), - ], - () => { - this.connected = false; - if (this === EventBusRedis.instance) EventBusRedis.instance = null; - cb(); - }, - ); - } else cb(); - } - - static getInstance( - config: IRedisConfig, - cb: ICallback>, - ): void { - if (!EventBusRedis.instance) { - let pubClient: RedisClient | null | undefined = null; - let subClient: RedisClient | null | undefined = null; - async.waterfall( - [ - (cb: ICallback) => - redis.createInstance(config, (err, client) => { - if (err) cb(err); - else { - pubClient = client; - cb(); - } - }), - (cb: ICallback) => - redis.createInstance(config, (err, client) => { - if (err) cb(err); - else { - subClient = client; - cb(); - } - }), - ], - (err) => { - if (err) { - if (pubClient) pubClient.halt(() => void 0); - if (subClient) subClient.halt(() => void 0); - cb(err); - } else if (!pubClient || !subClient) cb(new Error()); - else { - EventBusRedis.instance = new EventBusRedis(pubClient, subClient); - cb(null, EventBusRedis.instance); - } - }, - ); - } else cb(null, EventBusRedis.instance); - } -} diff --git a/src/event/event-emitter.ts b/src/event/event-emitter.ts index 00c7fad..448bcba 100644 --- a/src/event/event-emitter.ts +++ b/src/event/event-emitter.ts @@ -8,7 +8,7 @@ */ import { EventEmitter as NodeEventEmitter } from 'events'; -import { IEventEmitter, TEventEmitterEvent } from '../../types/index.js'; +import { IEventEmitter, TEventEmitterEvent } from './types/index.js'; // A typed EventEmitter around Node's EventEmitter with a limited set of methods export class EventEmitter diff --git a/src/event/index.ts b/src/event/index.ts index 6812e4c..d5eeefe 100644 --- a/src/event/index.ts +++ b/src/event/index.ts @@ -8,5 +8,4 @@ */ export * from './event-emitter.js'; -export * from './event-bus-redis/event-bus-redis.js'; -export * from './event-bus/event-bus.js'; +export * from './types/index.js'; diff --git a/types/event/event-emitter.ts b/src/event/types/event-emitter.ts similarity index 100% rename from types/event/event-emitter.ts rename to src/event/types/event-emitter.ts diff --git a/src/event/types/index.ts b/src/event/types/index.ts new file mode 100644 index 0000000..a6dc4dd --- /dev/null +++ b/src/event/types/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './event-emitter.js'; +export * from '../../event-bus/types/event-bus.js'; diff --git a/src/locker/errors/index.ts b/src/locker/errors/index.ts index 3ddc8ff..af5c814 100644 --- a/src/locker/errors/index.ts +++ b/src/locker/errors/index.ts @@ -8,7 +8,6 @@ */ export { LockError } from './lock.error.js'; -export { LockAbortError } from './lock-abort.error.js'; export { LockAcquireError } from './lock-acquire.error.js'; export { LockExtendError } from './lock-extend.error.js'; export { LockMethodNotAllowedError } from './lock-method-not-allowed.error.js'; diff --git a/src/locker/errors/lock-abort.error.ts b/src/locker/errors/lock-abort.error.ts deleted file mode 100644 index 42a247e..0000000 --- a/src/locker/errors/lock-abort.error.ts +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { LockError } from './lock.error.js'; - -export class LockAbortError extends LockError { - constructor(message = `releaseLock() may have been called. Abandoning.`) { - super(message); - } -} diff --git a/src/locker/index.ts b/src/locker/index.ts new file mode 100644 index 0000000..0605505 --- /dev/null +++ b/src/locker/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './errors/index.js'; +export * from './locker.js'; diff --git a/src/locker/locker.ts b/src/locker/locker.ts index 500e4bd..ec0feb0 100644 --- a/src/locker/locker.ts +++ b/src/locker/locker.ts @@ -7,20 +7,19 @@ * in the root directory of this source tree. */ -import { ICallback, IEventBus, ILogger } from '../../types/index.js'; -import { CallbackEmptyReplyError } from '../errors/callback-empty-reply.error.js'; -import { EventBus } from '../event/event-bus/event-bus.js'; -import { RedisClient } from '../redis-client/redis-client.js'; -import { Runnable } from '../runnable/runnable.js'; -import { Timer } from '../timer/timer.js'; +import { ICallback } from '../common/index.js'; +import { AbortError } from '../errors/index.js'; +import { ILogger } from '../logger/index.js'; +import { IRedisClient } from '../redis-client/index.js'; +import { Runnable } from '../runnable/index.js'; +import { Timer } from '../timer/index.js'; import { - LockAbortError, LockAcquireError, LockExtendError, LockMethodNotAllowedError, LockNotAcquiredError, } from './errors/index.js'; -import { ELuaScript } from './redis-client/redis-client.js'; +import { ELuaScript } from './redis-client/lua-scripts/lua-scripts.js'; export type TLockerEvent = { 'locker.up': (id: string) => void; @@ -30,51 +29,35 @@ export type TLockerEvent = { 'locker.error': (error: Error, id: string) => void; }; -export class Locker extends Runnable { +export class Locker extends Runnable { protected readonly lockKey; protected readonly retryOnFail; protected readonly ttl; protected readonly redisClient; protected readonly autoExtendInterval; protected readonly timer; - protected eventBus: IEventBus | null = null; + protected readonly logger; constructor( - redisClient: RedisClient, + redisClient: IRedisClient, logger: ILogger, lockKey: string, ttl: number, retryOnFail = false, autoExtendInterval: number = 0, ) { - super(logger); + super(); this.lockKey = lockKey; this.ttl = ttl; this.retryOnFail = retryOnFail; this.autoExtendInterval = autoExtendInterval; - this.timer = new Timer(); + this.logger = logger; this.redisClient = redisClient; this.redisClient.on('error', (err) => this.handleError(err)); + this.timer = new Timer(); + this.timer.on('error', (err) => this.handleError(err)); } - protected setUpEventBus = (cb: ICallback): void => { - if (!this.eventBus) { - EventBus.getInstance((err, instance) => { - if (err) cb(err); - else if (!instance) cb(new CallbackEmptyReplyError()); - else { - this.eventBus = instance; - cb(); - } - }); - } else cb(); - }; - - protected tearDownEventBus = (cb: ICallback): void => { - if (this.eventBus) this.eventBus.disconnect(cb); - else cb(); - }; - protected lock = (cb: ICallback) => { this.redisClient.set( this.lockKey, @@ -95,7 +78,7 @@ export class Locker extends Runnable { else cb(new LockAcquireError()); } else cb(); } else { - cb(new LockAbortError()); + cb(new AbortError()); } }, ); @@ -113,9 +96,9 @@ export class Locker extends Runnable { else { if (this.powerSwitch.isRunning()) { if (reply !== 1) { - this.forceShutdown(() => cb(new LockExtendError())); + this.shutdown(() => cb(new LockExtendError())); } else cb(); - } else cb(new LockAbortError()); + } else cb(new AbortError()); } }, ); @@ -131,7 +114,7 @@ export class Locker extends Runnable { ); }; - protected tearDownTicker = (cb: ICallback): void => { + protected resetTimer = (cb: ICallback): void => { this.timer.reset(); cb(); }; @@ -142,7 +125,7 @@ export class Locker extends Runnable { () => this.extend((err) => { if (err) { - if (!(err instanceof LockAbortError)) this.handleError(err); + if (!(err instanceof AbortError)) this.handleError(err); } else this.autoExtendLock(); }), this.autoExtendInterval, @@ -151,13 +134,20 @@ export class Locker extends Runnable { } protected override goingUp(): ((cb: ICallback) => void)[] { - return super.goingUp().concat([this.setUpEventBus, this.lock]); + return super.goingUp().concat([this.lock]); } protected override goingDown(): ((cb: ICallback) => void)[] { - return [this.tearDownTicker, this.release, this.tearDownEventBus].concat( - super.goingDown(), - ); + return [this.resetTimer, this.release].concat(super.goingDown()); + } + + protected override handleError(err: Error) { + this.emit('locker.error', err, this.id); + super.handleError(err); + } + + protected override getLogger(): ILogger { + return this.logger; } override run(cb: ICallback) { @@ -172,16 +162,11 @@ export class Locker extends Runnable { }); } - protected override handleError(err: Error) { - this.eventBus?.emit('locker.error', err, this.id); - super.handleError(err); - } - acquireLock(cb: ICallback) { this.run(cb); } - releaseLock(cb: ICallback) { + releaseLock(cb: ICallback) { this.shutdown(cb); } diff --git a/src/locker/redis-client/redis-client.ts b/src/locker/redis-client/lua-scripts/lua-scripts.ts similarity index 55% rename from src/locker/redis-client/redis-client.ts rename to src/locker/redis-client/lua-scripts/lua-scripts.ts index 13075bd..95af55e 100644 --- a/src/locker/redis-client/redis-client.ts +++ b/src/locker/redis-client/lua-scripts/lua-scripts.ts @@ -9,8 +9,8 @@ import fs from 'fs'; import { resolve } from 'path'; -import { getDirname } from '../../env/environment.js'; -import { RedisClient } from '../../redis-client/redis-client.js'; +import { getDirname } from '../../../env/environment.js'; +import { RedisClientAbstract } from '../../../redis-client/clients/redis-client-abstract.js'; export enum ELuaScript { RELEASE_LOCK = 'RELEASE_LOCK', @@ -19,11 +19,11 @@ export enum ELuaScript { const dir = getDirname(); -RedisClient.addScript( +RedisClientAbstract.addScript( ELuaScript.RELEASE_LOCK, - fs.readFileSync(resolve(dir, './lua/release-lock.lua')).toString(), + fs.readFileSync(resolve(dir, './scripts/release-lock.lua')).toString(), ); -RedisClient.addScript( +RedisClientAbstract.addScript( ELuaScript.EXTEND_LOCK, - fs.readFileSync(resolve(dir, './lua/extend-lock.lua')).toString(), + fs.readFileSync(resolve(dir, './scripts/extend-lock.lua')).toString(), ); diff --git a/src/locker/redis-client/lua/extend-lock.lua b/src/locker/redis-client/lua-scripts/scripts/extend-lock.lua similarity index 100% rename from src/locker/redis-client/lua/extend-lock.lua rename to src/locker/redis-client/lua-scripts/scripts/extend-lock.lua diff --git a/src/locker/redis-client/lua/release-lock.lua b/src/locker/redis-client/lua-scripts/scripts/release-lock.lua similarity index 100% rename from src/locker/redis-client/lua/release-lock.lua rename to src/locker/redis-client/lua-scripts/scripts/release-lock.lua diff --git a/src/logger/index.ts b/src/logger/index.ts new file mode 100644 index 0000000..d1220bc --- /dev/null +++ b/src/logger/index.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './types/index.js'; +export * from './errors/index.js'; +export * from './logger.js'; diff --git a/src/logger/logger.ts b/src/logger/logger.ts index 7defa7a..8d04503 100644 --- a/src/logger/logger.ts +++ b/src/logger/logger.ts @@ -8,7 +8,7 @@ */ import { createLogger } from 'bunyan'; -import { ILogger, ILoggerConfig } from '../../types/index.js'; +import { ILogger, ILoggerConfig } from './types/index.js'; import { LoggerError } from './errors/index.js'; const noop = () => void 0; diff --git a/types/logger/config.ts b/src/logger/types/config.ts similarity index 100% rename from types/logger/config.ts rename to src/logger/types/config.ts diff --git a/types/logger/index.ts b/src/logger/types/index.ts similarity index 100% rename from types/logger/index.ts rename to src/logger/types/index.ts diff --git a/src/power-switch/index.ts b/src/power-switch/index.ts new file mode 100644 index 0000000..90a038b --- /dev/null +++ b/src/power-switch/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './power-switch.js'; diff --git a/types/index.ts b/src/redis-client/clients/index.ts similarity index 50% rename from types/index.ts rename to src/redis-client/clients/index.ts index 3aa011f..8c9c818 100644 --- a/types/index.ts +++ b/src/redis-client/clients/index.ts @@ -7,9 +7,8 @@ * in the root directory of this source tree. */ -export * from './common.js'; -export * from './redis/index.js'; -export * from './logger/index.js'; -export * from './event/index.js'; -export * from './worker/index.js'; -export * from './timer/index.js'; +export * from './redis-client-abstract.js'; +export * from './ioredis-client.js'; +export * from './ioredis-client-multi.js'; +export * from './node-redis-client.js'; +export * from './node-redis-client-multi.js'; diff --git a/src/redis-client/clients/ioredis-client-multi.ts b/src/redis-client/clients/ioredis-client-multi.ts index 6b8b4f9..c477356 100644 --- a/src/redis-client/clients/ioredis-client-multi.ts +++ b/src/redis-client/clients/ioredis-client-multi.ts @@ -8,7 +8,8 @@ */ import { Pipeline, Redis } from 'ioredis'; -import { ICallback, IRedisTransaction } from '../../../types/index.js'; +import { ICallback } from '../../common/index.js'; +import { IRedisTransaction } from '../types/index.js'; import { RedisClientError, WatchedKeysChangedError } from '../errors/index.js'; export class IoredisClientMulti implements IRedisTransaction { diff --git a/src/redis-client/clients/ioredis-client.ts b/src/redis-client/clients/ioredis-client.ts index 9d023a9..135c31a 100644 --- a/src/redis-client/clients/ioredis-client.ts +++ b/src/redis-client/clients/ioredis-client.ts @@ -8,13 +8,13 @@ */ import IORedis, { Redis, RedisOptions } from 'ioredis'; -import { ICallback } from '../../../types/index.js'; import { CallbackEmptyReplyError } from '../../errors/index.js'; import { RedisClientError } from '../errors/index.js'; -import { RedisClient } from '../redis-client.js'; +import { RedisClientAbstract } from './redis-client-abstract.js'; import { IoredisClientMulti } from './ioredis-client-multi.js'; +import { ICallback } from '../../common/index.js'; -export class IoredisClient extends RedisClient { +export class IoredisClient extends RedisClientAbstract { protected client: Redis; constructor(config: RedisOptions = {}) { @@ -389,7 +389,7 @@ export class IoredisClient extends RedisClient { } } - quit(cb: ICallback = () => void 0): void { + shutDown(cb: ICallback = () => void 0): void { if (!this.connectionClosed) { this.client.once('end', cb); this.client.quit(); diff --git a/src/redis-client/clients/node-redis-client-multi.ts b/src/redis-client/clients/node-redis-client-multi.ts index 5c718b7..fd44ecf 100644 --- a/src/redis-client/clients/node-redis-client-multi.ts +++ b/src/redis-client/clients/node-redis-client-multi.ts @@ -8,12 +8,12 @@ */ import { WatchError } from '@redis/client'; +import { ICallback } from '../../common/index.js'; import { - ICallback, IRedisTransaction, TRedisClientNodeRedis, TRedisTransactionNodeRedis, -} from '../../../types/index.js'; +} from '../types/index.js'; import { WatchedKeysChangedError } from '../errors/index.js'; export class NodeRedisClientMulti implements IRedisTransaction { diff --git a/src/redis-client/clients/node-redis-client.ts b/src/redis-client/clients/node-redis-client.ts index b496870..72847cd 100644 --- a/src/redis-client/clients/node-redis-client.ts +++ b/src/redis-client/clients/node-redis-client.ts @@ -8,12 +8,12 @@ */ import { createClient, RedisClientOptions } from '@redis/client'; -import { ICallback } from '../../../types/index.js'; import { RedisClientError } from '../errors/index.js'; -import { RedisClient } from '../redis-client.js'; +import { RedisClientAbstract } from './redis-client-abstract.js'; import { NodeRedisClientMulti } from './node-redis-client-multi.js'; +import { ICallback } from '../../common/index.js'; -export class NodeRedisClient extends RedisClient { +export class NodeRedisClient extends RedisClientAbstract { protected client; constructor(config: RedisClientOptions = {}) { @@ -517,7 +517,7 @@ export class NodeRedisClient extends RedisClient { } } - quit(cb: ICallback = () => void 0): void { + shutDown(cb: ICallback = () => void 0): void { if (!this.connectionClosed) { this.client.once('end', cb); this.client.quit(); diff --git a/src/redis-client/redis-client.ts b/src/redis-client/clients/redis-client-abstract.ts similarity index 90% rename from src/redis-client/redis-client.ts rename to src/redis-client/clients/redis-client-abstract.ts index 3c26767..5fb71fb 100644 --- a/src/redis-client/redis-client.ts +++ b/src/redis-client/clients/redis-client-abstract.ts @@ -8,19 +8,19 @@ */ import { - ICallback, IRedisClient, IRedisTransaction, TRedisClientEvent, -} from '../../types/index.js'; -import { CallbackEmptyReplyError } from '../errors/index.js'; -import { EventEmitter } from '../event/index.js'; -import { RedisClientError } from './errors/index.js'; -import { ELuaScriptName, LuaScript } from './lua-script.js'; +} from '../types/index.js'; +import { CallbackEmptyReplyError } from '../../errors/index.js'; +import { EventEmitter } from '../../event/index.js'; +import { RedisClientError } from '../errors/index.js'; +import { ELuaScriptName, LuaScript } from '../lua-scripts/lua-script.js'; +import { ICallback } from '../../common/index.js'; const minimalSupportedVersion: [number, number, number] = [4, 0, 0]; -export abstract class RedisClient +export abstract class RedisClientAbstract extends EventEmitter implements IRedisClient { @@ -28,15 +28,15 @@ export abstract class RedisClient protected connectionClosed = true; validateRedisVersion(major: number, feature = 0, minor = 0): boolean { - if (!RedisClient.redisServerVersion) { + if (!RedisClientAbstract.redisServerVersion) { this.emit('error', new RedisClientError('UNKNOWN_REDIS_SERVER_VERSION')); return false; } return ( - RedisClient.redisServerVersion[0] > major || - (RedisClient.redisServerVersion[0] === major && - RedisClient.redisServerVersion[1] >= feature && - RedisClient.redisServerVersion[2] >= minor) + RedisClientAbstract.redisServerVersion[0] > major || + (RedisClientAbstract.redisServerVersion[0] === major && + RedisClientAbstract.redisServerVersion[1] >= feature && + RedisClientAbstract.redisServerVersion[2] >= minor) ); } @@ -47,7 +47,7 @@ export abstract class RedisClient cb(new RedisClientError('UNSUPPORTED_REDIS_SERVER_VERSION')); else cb(); }; - if (!RedisClient.redisServerVersion) { + if (!RedisClientAbstract.redisServerVersion) { this.updateServerVersion((err) => { if (err) cb(err); else validate(cb); @@ -322,17 +322,17 @@ export abstract class RedisClient abstract end(flush: boolean): void; - abstract quit(cb: ICallback): void; + abstract shutDown(cb: ICallback): void; abstract getInfo(cb: ICallback): void; updateServerVersion(cb: ICallback): void { - if (!RedisClient.redisServerVersion) { + if (!RedisClientAbstract.redisServerVersion) { this.getInfo((err, res) => { if (err) cb(err); else if (!res) cb(new CallbackEmptyReplyError()); else { - RedisClient.redisServerVersion = res + RedisClientAbstract.redisServerVersion = res .split('\r\n')[1] .split(':')[1] .split('.') diff --git a/src/redis-client/create-redis-client.ts b/src/redis-client/create-redis-client.ts new file mode 100644 index 0000000..82d61f3 --- /dev/null +++ b/src/redis-client/create-redis-client.ts @@ -0,0 +1,43 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { async } from '../async/index.js'; +import { ICallback } from '../common/index.js'; +import { IoredisClient, NodeRedisClient } from './clients/index.js'; +import { + ERedisConfigClient, + IRedisClient, + IRedisConfig, +} from './types/index.js'; + +function getClient(config: IRedisConfig) { + if (config.client === ERedisConfigClient.REDIS) { + return new NodeRedisClient(config.options); + } + return new IoredisClient(config.options); +} + +export function createRedisClient( + config: IRedisConfig, + cb: ICallback, +): void { + const client = getClient(config); + client.once('ready', () => { + async.waterfall( + [ + (cb: ICallback) => client.validateRedisServerSupport(cb), + (cb: ICallback) => client.loadScripts(cb), + ], + (err) => { + if (err) cb(err); + else cb(null, client); + }, + ); + }); +} diff --git a/src/redis-client/index.ts b/src/redis-client/index.ts index 14a279d..5372668 100644 --- a/src/redis-client/index.ts +++ b/src/redis-client/index.ts @@ -7,42 +7,8 @@ * in the root directory of this source tree. */ -import { - ERedisConfigClient, - ICallback, - IRedisConfig, -} from '../../types/index.js'; -import { async } from '../async/async.js'; -import { IoredisClient } from './clients/ioredis-client.js'; -import { NodeRedisClient } from './clients/node-redis-client.js'; -import { RedisClient } from './redis-client.js'; - -function getClient(config: IRedisConfig) { - if (config.client === ERedisConfigClient.REDIS) { - return new NodeRedisClient(config.options); - } - return new IoredisClient(config.options); -} - -function createInstance( - config: IRedisConfig, - cb: ICallback, -): void { - const client = getClient(config); - client.once('ready', () => { - async.waterfall( - [ - (cb: ICallback) => client.validateRedisServerSupport(cb), - (cb: ICallback) => client.loadScripts(cb), - ], - (err) => { - if (err) cb(err); - else cb(null, client); - }, - ); - }); -} - -export const redis = { - createInstance, -}; +export * from './types/index.js'; +export * from './errors/index.js'; +export * from './clients/index.js'; +export * from './lua-scripts/index.js'; +export * from './create-redis-client.js'; diff --git a/src/redis-client/lua-scripts/index.ts b/src/redis-client/lua-scripts/index.ts new file mode 100644 index 0000000..41450f4 --- /dev/null +++ b/src/redis-client/lua-scripts/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './lua-script.js'; diff --git a/src/redis-client/lua-script.ts b/src/redis-client/lua-scripts/lua-script.ts similarity index 81% rename from src/redis-client/lua-script.ts rename to src/redis-client/lua-scripts/lua-script.ts index 5dfa294..8928590 100644 --- a/src/redis-client/lua-script.ts +++ b/src/redis-client/lua-scripts/lua-script.ts @@ -9,11 +9,12 @@ import fs from 'fs'; import { resolve } from 'path'; -import type { ICallback, IRedisClient } from '../../types/index.js'; -import { async } from '../async/async.js'; -import { getDirname } from '../env/environment.js'; -import { CallbackEmptyReplyError } from '../errors/index.js'; -import { RedisClientError } from './errors/index.js'; +import { async } from '../../async/index.js'; +import { ICallback } from '../../common/index.js'; +import { getDirname } from '../../env/environment.js'; +import { CallbackEmptyReplyError } from '../../errors/index.js'; +import { RedisClientError } from '../errors/index.js'; +import { IRedisClient } from '../types/index.js'; const dir = getDirname(); @@ -31,11 +32,11 @@ export class LuaScript { this.scripts = new Map(); this.addScript( ELuaScriptName.ZPOPRPUSH, - fs.readFileSync(resolve(dir, './lua/zpoprpush.lua')).toString(), + fs.readFileSync(resolve(dir, './scripts/zpoprpush.lua')).toString(), ); this.addScript( ELuaScriptName.LPOPRPUSH, - fs.readFileSync(resolve(dir, './lua/lpoprpush.lua')).toString(), + fs.readFileSync(resolve(dir, './scripts/lpoprpush.lua')).toString(), ); } diff --git a/src/redis-client/lua/lpoprpush.lua b/src/redis-client/lua-scripts/scripts/lpoprpush.lua similarity index 100% rename from src/redis-client/lua/lpoprpush.lua rename to src/redis-client/lua-scripts/scripts/lpoprpush.lua diff --git a/src/redis-client/lua/zpoprpush.lua b/src/redis-client/lua-scripts/scripts/zpoprpush.lua similarity index 100% rename from src/redis-client/lua/zpoprpush.lua rename to src/redis-client/lua-scripts/scripts/zpoprpush.lua diff --git a/types/redis/config.ts b/src/redis-client/types/config.ts similarity index 100% rename from types/redis/config.ts rename to src/redis-client/types/config.ts diff --git a/src/redis-client/types/index.ts b/src/redis-client/types/index.ts new file mode 100644 index 0000000..c652cb3 --- /dev/null +++ b/src/redis-client/types/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './config.js'; +export * from './redis-client.js'; diff --git a/types/redis/index.ts b/src/redis-client/types/redis-client.ts similarity index 95% rename from types/redis/index.ts rename to src/redis-client/types/redis-client.ts index 0d3fc6a..8924741 100644 --- a/types/redis/index.ts +++ b/src/redis-client/types/redis-client.ts @@ -14,7 +14,8 @@ import { RedisModules, RedisScripts, } from '@redis/client'; -import { ICallback } from '../common.js'; +import { ICallback } from '../../common/index.js'; +import { EventEmitter } from '../../event/index.js'; export * from './config.js'; @@ -38,7 +39,7 @@ export type TRedisClientNodeRedis = RedisClientType< RedisScripts >; -export interface IRedisClient { +export interface IRedisClient extends EventEmitter { validateRedisVersion( major: number, feature?: number, @@ -85,11 +86,11 @@ export interface IRedisClient { psubscribe(pattern: string): void; - punsubscribe(channel: string): void; + punsubscribe(channel?: string): void; subscribe(channel: string): void; - unsubscribe(channel: string): void; + unsubscribe(channel?: string): void; zrangebyscore( key: string, @@ -245,7 +246,7 @@ export interface IRedisClient { end(flush: boolean): void; - quit(cb: ICallback): void; + shutDown(cb: ICallback): void; getInfo(cb: ICallback): void; diff --git a/src/runnable/index.ts b/src/runnable/index.ts new file mode 100644 index 0000000..c864919 --- /dev/null +++ b/src/runnable/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './runnable.js'; diff --git a/src/runnable/runnable.ts b/src/runnable/runnable.ts index 53f5382..458dbe6 100644 --- a/src/runnable/runnable.ts +++ b/src/runnable/runnable.ts @@ -8,20 +8,25 @@ */ import { v4 as uuid } from 'uuid'; -import { ICallback, ILogger } from '../../types/index.js'; -import { async } from '../async/async.js'; -import { PowerSwitch } from '../power-switch/power-switch.js'; +import { async } from '../async/index.js'; +import { ICallback } from '../common/index.js'; +import { AbortError } from '../errors/index.js'; +import { EventEmitter, TEventEmitterEvent } from '../event/index.js'; +import { ILogger } from '../logger/index.js'; +import { PowerSwitch } from '../power-switch/index.js'; -export class Runnable { +export abstract class Runnable< + Event extends TEventEmitterEvent, +> extends EventEmitter { protected id; protected powerSwitch; - protected logger; protected forceShutdownOnError = true; + protected cleanUpBeforeShutdown = false; - constructor(logger: ILogger) { + protected constructor() { + super(); this.id = uuid(); this.powerSwitch = new PowerSwitch(); - this.logger = logger; } protected goingUp(): ((cb: ICallback) => void)[] { @@ -42,50 +47,72 @@ export class Runnable { cb(null, true); } - protected forceShutdown(cb: ICallback): void { - if (this.powerSwitch.isGoingUp()) this.powerSwitch.rollback(); - if (this.powerSwitch.isRunning()) this.powerSwitch.goingDown(); - const tasks = this.goingDown(); - async.waterfall(tasks, () => { - if (this.powerSwitch.isGoingDown()) { - this.down(() => cb()); - } else cb(); - }); - } - protected handleError(err: Error): void { - if (this.powerSwitch.isGoingUp() || this.powerSwitch.isRunning()) { - this.logger.error(err); - this.forceShutdown(() => void 0); + if (this.isRunning()) { + this.getLogger().error(err); + this.shutdown(() => void 0); } } + protected abstract getLogger(): ILogger; + isRunning() { - return this.powerSwitch.isRunning() || this.powerSwitch.goingUp(); + return this.powerSwitch.isRunning() || this.powerSwitch.isGoingUp(); + } + + isGoingUp() { + return this.powerSwitch.isGoingUp(); + } + + isGoingDown() { + return this.powerSwitch.isGoingDown(); + } + + isUp() { + return this.powerSwitch.isUp(); + } + + isDown() { + return this.powerSwitch.isDown(); } run(cb: ICallback): void { const r = this.powerSwitch.goingUp(); if (r) { - const tasks = this.goingUp(); + const tasks = this.goingUp().map((task) => (cb: ICallback) => { + if (this.isGoingUp()) { + this.cleanUpBeforeShutdown = true; + task(cb); + } else cb(new AbortError()); + }); async.waterfall(tasks, (err) => { - if (err) { - if (this.forceShutdownOnError) this.forceShutdown(() => cb(err)); - else cb(err); - } else this.up(cb); + if (this.isRunning()) { + if (err) { + if (this.forceShutdownOnError) this.shutdown(() => cb(err)); + else cb(err); + } else this.up(cb); + } else this.shutdown(() => cb(new AbortError())); }); } else cb(null, r); } - shutdown(cb: ICallback): void { - const r = this.powerSwitch.goingDown(); - if (r) { + shutdown(cb: ICallback): void { + /* + down and null -> do nothing + down and goingUp -> rollback + up and null -> rollback + up and goingDown -> do nothing + */ + if (this.isRunning()) { + if (this.isGoingUp()) this.powerSwitch.rollback(); + if (this.isUp()) this.powerSwitch.goingDown(); const tasks = this.goingDown(); + this.cleanUpBeforeShutdown = false; async.waterfall(tasks, () => { - // ignoring shutdown errors - this.down(cb); + if (this.cleanUpBeforeShutdown) this.shutdown(cb); + else this.down(() => cb()); }); - } else cb(null, r); + } else cb(); } getId(): string { diff --git a/src/timer/index.ts b/src/timer/index.ts new file mode 100644 index 0000000..5bc7cc0 --- /dev/null +++ b/src/timer/index.ts @@ -0,0 +1,12 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './errors/index.js'; +export * from './types/index.js'; +export * from './timer.js'; diff --git a/src/timer/timer.ts b/src/timer/timer.ts index ed5b032..7552f2b 100644 --- a/src/timer/timer.ts +++ b/src/timer/timer.ts @@ -7,8 +7,8 @@ * in the root directory of this source tree. */ -import { TFunction } from '../../types/index.js'; -import { TTimer, TTimerEvent } from '../../types/timer/index.js'; +import { TFunction } from '../common/index.js'; +import { TTimer, TTimerEvent } from './types/index.js'; import { EventEmitter } from '../event/index.js'; import { TimerError } from './errors/index.js'; diff --git a/src/timer/types/index.ts b/src/timer/types/index.ts new file mode 100644 index 0000000..b92dfbd --- /dev/null +++ b/src/timer/types/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './timer.js'; diff --git a/types/timer/index.ts b/src/timer/types/timer.ts similarity index 87% rename from types/timer/index.ts rename to src/timer/types/timer.ts index 6938892..c2a86f5 100644 --- a/types/timer/index.ts +++ b/src/timer/types/timer.ts @@ -7,7 +7,7 @@ * in the root directory of this source tree. */ -import { TFunction } from '../common.js'; +import { TFunction } from '../../common/index.js'; export type TTimerEvent = { error: (err: Error) => void; diff --git a/src/worker/errors/worker-thread.error.ts b/src/worker/errors/worker-thread.error.ts index 7a2612c..aa95a7d 100644 --- a/src/worker/errors/worker-thread.error.ts +++ b/src/worker/errors/worker-thread.error.ts @@ -11,7 +11,7 @@ import { EWorkerThreadExecutionCode, EWorkerThreadExitCode, TWorkerThreadMessage, -} from '../../../types/worker/index.js'; +} from '../types/index.js'; import { WorkerError } from './worker-error.js'; export class WorkerThreadError extends WorkerError { diff --git a/src/worker/index.ts b/src/worker/index.ts new file mode 100644 index 0000000..c4b3d5c --- /dev/null +++ b/src/worker/index.ts @@ -0,0 +1,14 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +export * from './types/index.js'; +export * from './errors/index.js'; +export * from './worker-callable.js'; +export * from './worker-runnable.js'; +export * from './worker-resource-group.js'; diff --git a/types/worker/index.ts b/src/worker/types/index.ts similarity index 100% rename from types/worker/index.ts rename to src/worker/types/index.ts diff --git a/types/worker/worker.ts b/src/worker/types/worker.ts similarity index 96% rename from types/worker/worker.ts rename to src/worker/types/worker.ts index b80cfe5..7b3c3ef 100644 --- a/types/worker/worker.ts +++ b/src/worker/types/worker.ts @@ -7,7 +7,7 @@ * in the root directory of this source tree. */ -import { ICallback } from '../common.js'; +import { ICallback } from '../../common/index.js'; export enum EWorkerType { CALLABLE, diff --git a/src/worker/worker-callable.ts b/src/worker/worker-callable.ts index de16f68..73006a8 100644 --- a/src/worker/worker-callable.ts +++ b/src/worker/worker-callable.ts @@ -7,8 +7,8 @@ * in the root directory of this source tree. */ -import { ICallback } from '../../types/index.js'; -import { EWorkerType, IWorkerCallable } from '../../types/worker/index.js'; +import { ICallback } from '../common/index.js'; +import { EWorkerType, IWorkerCallable } from './types/index.js'; import { WorkerPayloadRequiredError } from './errors/index.js'; import { Worker } from './worker.js'; diff --git a/src/worker/worker-resource-group.ts b/src/worker/worker-resource-group.ts index 576565d..dea889d 100644 --- a/src/worker/worker-resource-group.ts +++ b/src/worker/worker-resource-group.ts @@ -9,43 +9,41 @@ import { readdir } from 'fs'; import path from 'path'; -import { ICallback, IEventBus, ILogger } from '../../types/index.js'; -import { async } from '../async/async.js'; -import { CallbackEmptyReplyError } from '../errors/callback-empty-reply.error.js'; -import { EventBus } from '../event/event-bus/event-bus.js'; -import { Locker, TLockerEvent } from '../locker/locker.js'; -import { PowerSwitch } from '../power-switch/power-switch.js'; -import { RedisClient } from '../redis-client/redis-client.js'; -import { Runnable } from '../runnable/runnable.js'; +import { async } from '../async/index.js'; +import { ICallback } from '../common/index.js'; +import { AbortError } from '../errors/index.js'; +import { Locker } from '../locker/locker.js'; +import { ILogger } from '../logger/index.js'; +import { PowerSwitch } from '../power-switch/index.js'; +import { IRedisClient } from '../redis-client/index.js'; +import { Runnable } from '../runnable/index.js'; import { WorkerRunnable } from './worker-runnable.js'; -export type TWorkerResourceGroupEvent = TLockerEvent & { - 'workerResourceGroup.error': ( - err: Error, - workerResourceGroupId: string, - ) => void; +export type TWorkerResourceGroupEvent = { + 'workerResourceGroup.error': (err: Error) => void; }; -export class WorkerResourceGroup extends Runnable { - protected readonly powerManager: PowerSwitch; - protected readonly locker: Locker; - protected readonly redisClient: RedisClient; +export class WorkerResourceGroup extends Runnable { + protected readonly powerManager; + protected readonly locker; + protected readonly redisClient; + protected readonly logger; protected workers: { instance: WorkerRunnable; payload: unknown }[] = []; - protected eventBus: IEventBus | null = null; + protected runWorkersLocked = false; constructor( - redisClient: RedisClient, + redisClient: IRedisClient, logger: ILogger, resourceGroupId: string, ) { - super(logger); + super(); this.powerManager = new PowerSwitch(); this.logger = logger; // this.redisClient = redisClient; - this.redisClient.on('error', (err) => this.handleError(err)); + this.redisClient.once('error', (err) => this.handleError(err)); // Locker this.locker = new Locker( @@ -56,77 +54,80 @@ export class WorkerResourceGroup extends Runnable { true, 15000, ); - this.eventBus?.on('locker.error', (err, id) => { - if (id === this.locker.getId()) this.handleError(err); + this.locker.on('locker.error', (err) => { + this.handleError(err); }); } - protected setUpEventBus = (cb: ICallback): void => { - if (!this.eventBus) { - EventBus.getInstance((err, instance) => { - if (err) cb(err); - else if (!instance) cb(new CallbackEmptyReplyError()); - else { - this.eventBus = instance; - cb(); - } - }); - } else cb(); - }; - - protected tearDownEventBus = (cb: ICallback): void => { - if (this.eventBus) this.eventBus.disconnect(cb); - else cb(); + protected lock = (cb: ICallback) => { + this.locker.acquireLock((err) => { + if (err) cb(err); + else { + this.logger.info( + `Workers are exclusively running from this instance (Lock ID ${this.locker.getId()}).`, + ); + cb(); + } + }); }; - protected runWorkers = (): void => { - async.waterfall( - [ - (cb: ICallback) => { - this.locker.acquireLock((err) => { - if (err) cb(err); - else { - this.logger.info( - `Workers are exclusively running from this instance (Lock ID ${this.locker.getId()}).`, - ); - cb(); - } - }); + protected runWorkers = (cb: ICallback) => { + if (!this.runWorkersLocked) { + this.runWorkersLocked = true; + async.each( + this.workers, + (worker, _, done) => { + const { instance, payload } = worker; + instance.run(payload, done); }, - (cb: ICallback) => { - async.each( - this.workers, - (worker, _, done) => { - const { instance, payload } = worker; - instance.run(payload, done); - }, - cb, - ); + (err) => { + this.runWorkersLocked = false; + cb(err); }, - ], - (err) => { - if (err) this.handleError(err); - }, - ); + ); + } else cb(new AbortError()); }; protected shutDownWorkers = (cb: ICallback): void => { - async.each( - this.workers, - (worker, _, done) => { - worker.instance.quit(() => done()); - }, - () => { - this.workers = []; - cb(); - }, - ); + if (!this.runWorkersLocked) { + this.runWorkersLocked = true; + async.each( + this.workers, + (worker, _, done) => { + worker.instance.shutDown(() => done()); + }, + () => { + this.workers = []; + this.runWorkersLocked = false; + cb(); + }, + ); + } else setTimeout(() => this.shutDownWorkers(cb), 1000); }; protected releaseLock = (cb: ICallback) => { - this.locker.releaseLock((err) => cb(err)); + this.locker.releaseLock(cb); }; + protected override getLogger(): ILogger { + return this.logger; + } + + protected override goingUp(): ((cb: ICallback) => void)[] { + return super.goingUp().concat([this.lock, this.runWorkers]); + } + + protected override goingDown(): ((cb: ICallback) => void)[] { + return [this.shutDownWorkers, this.releaseLock].concat(super.goingDown()); + } + + protected override handleError(err: Error) { + if (this.isRunning()) { + this.emit('workerResourceGroup.error', err); + super.handleError(err); + } + } + addWorker = (filename: string, payload: unknown): void => { const worker = new WorkerRunnable(filename); worker.on('worker.error', (err) => this.handleError(err)); @@ -138,45 +139,23 @@ export class WorkerResourceGroup extends Runnable { payload: unknown, cb: ICallback, ): void => { - readdir(workersDir, (err, files) => { - if (err) cb(err); - else { - async.each( - files ?? [], - (file, _, done) => { - if (file.endsWith('.worker.js')) { - const filepath = path.resolve(workersDir, file); - this.addWorker(filepath, payload); - done(); - } else done(); - }, - (err) => cb(err), - ); - } - }); + if (this.isDown() && !this.isGoingUp()) { + readdir(workersDir, (err, files) => { + if (err) cb(err); + else { + async.each( + files ?? [], + (file, _, done) => { + if (file.endsWith('.worker.js')) { + const filepath = path.resolve(workersDir, file); + this.addWorker(filepath, payload); + done(); + } else done(); + }, + (err) => cb(err), + ); + } + }); + } }; - - protected override goingUp(): ((cb: ICallback) => void)[] { - return super.goingUp().concat([this.setUpEventBus]); - } - - protected override goingDown(): ((cb: ICallback) => void)[] { - return [ - this.shutDownWorkers, - this.releaseLock, - this.tearDownEventBus, - ].concat(super.goingDown()); - } - - protected override up(cb: ICallback): void { - super.up(() => { - this.runWorkers(); - cb(); - }); - } - - protected override handleError(err: Error) { - this.eventBus?.emit('workerResourceGroup.error', err, this.id); - super.handleError(err); - } } diff --git a/src/worker/worker-runnable.ts b/src/worker/worker-runnable.ts index 996ee06..b628e4e 100644 --- a/src/worker/worker-runnable.ts +++ b/src/worker/worker-runnable.ts @@ -7,8 +7,8 @@ * in the root directory of this source tree. */ -import { ICallback } from '../../types/index.js'; -import { EWorkerType, IWorkerRunnable } from '../../types/worker/index.js'; +import { ICallback } from '../common/index.js'; +import { EWorkerType, IWorkerRunnable } from './types/index.js'; import { PowerSwitch } from '../power-switch/power-switch.js'; import { WorkerAlreadyDownError, @@ -44,10 +44,10 @@ export class WorkerRunnable } else cb(new WorkerAlreadyRunningError()); } - override quit(cb: ICallback) { + override shutDown(cb: ICallback) { const r = this.powerSwitch.goingDown(); if (r) { - super.quit(() => { + super.shutDown(() => { this.powerSwitch.commit(); cb(); }); diff --git a/src/worker/worker-thread.ts b/src/worker/worker-thread.ts index 43948f2..54d1722 100644 --- a/src/worker/worker-thread.ts +++ b/src/worker/worker-thread.ts @@ -24,7 +24,7 @@ import { TWorkerFn, TWorkerThreadMessage, TWorkerThreadMessageCode, -} from '../../types/worker/index.js'; +} from './types/index.js'; function importWorkerFn( filename: string, @@ -40,8 +40,7 @@ function importWorkerFn( exit(EWorkerThreadExitCode.INVALID_WORKER_TYPE); } else cb(fn); }) - .catch((err: unknown) => { - console.error(err); + .catch(() => { exit(EWorkerThreadExitCode.FILE_IMPORT_ERROR); }); } diff --git a/src/worker/worker.ts b/src/worker/worker.ts index d13b39e..4f764fe 100644 --- a/src/worker/worker.ts +++ b/src/worker/worker.ts @@ -9,13 +9,13 @@ import { resolve } from 'path'; import { Worker as WorkerThread } from 'worker_threads'; -import { ICallback } from '../../types/index.js'; +import { ICallback } from '../common/index.js'; import { EWorkerThreadExecutionCode, EWorkerThreadExitCode, EWorkerType, TWorkerThreadMessage, -} from '../../types/worker/index.js'; +} from './types/index.js'; import { getDirname } from '../env/environment.js'; import { EventEmitter } from '../event/index.js'; import { WorkerThreadError } from './errors/index.js'; @@ -75,7 +75,6 @@ export abstract class Worker extends EventEmitter { const onMessage = (msg: TWorkerThreadMessage) => { cleanUp(); if (msg.code !== EWorkerThreadExecutionCode.OK) { - console.error(`WorkerThreadError`, msg); callback(new WorkerThreadError(msg)); } else callback(null, msg.data); }; @@ -85,7 +84,6 @@ export abstract class Worker extends EventEmitter { code: EWorkerThreadExitCode.TERMINATED, error: null, }; - console.error('WorkerThreadError', msg); callback(new WorkerThreadError(msg)); }; worker.once('message', onMessage); @@ -98,7 +96,7 @@ export abstract class Worker extends EventEmitter { this.getWorkerThread().postMessage(payload); } - quit(cb: ICallback) { + shutDown(cb: ICallback) { const callback = () => { this.workerThread = null; cb(); diff --git a/tests/async/test00001.test.ts b/tests/async/test00001.test.ts index fbbcc6e..7d3fe06 100644 --- a/tests/async/test00001.test.ts +++ b/tests/async/test00001.test.ts @@ -9,7 +9,7 @@ import { expect, it } from '@jest/globals'; import { async } from '../../src/async/async.js'; -import { ICallback } from '../../types/index.js'; +import { ICallback } from '../../src/common/index.js'; it('async.waterfall: case 1', async () => { let count = 0; diff --git a/tests/async/test00002.test.ts b/tests/async/test00002.test.ts index 80c7ad4..70c69d1 100644 --- a/tests/async/test00002.test.ts +++ b/tests/async/test00002.test.ts @@ -9,7 +9,7 @@ import { expect, it } from '@jest/globals'; import { async } from '../../src/async/async.js'; -import { ICallback } from '../../types/index.js'; +import { ICallback } from '../../src/common/index.js'; it('async.waterfall: case 2', async () => { let count = 0; diff --git a/tests/async/test00003.test.ts b/tests/async/test00003.test.ts index cb03d1d..a8988de 100644 --- a/tests/async/test00003.test.ts +++ b/tests/async/test00003.test.ts @@ -9,7 +9,7 @@ import { expect, it } from '@jest/globals'; import { async } from '../../src/async/async.js'; -import { ICallback } from '../../types/index.js'; +import { ICallback } from '../../src/common/index.js'; it('async.waterfall: case 3', async () => { const count = 0; diff --git a/tests/common.ts b/tests/common.ts index 22a71cc..eb20658 100644 --- a/tests/common.ts +++ b/tests/common.ts @@ -8,12 +8,11 @@ */ import bluebird from 'bluebird'; -import { redis } from '../src/redis-client/index.js'; -import { RedisClient } from '../src/redis-client/redis-client.js'; +import { createRedisClient, IRedisClient } from '../src/redis-client/index.js'; import { redisConfig } from './config.js'; -const redisClients: RedisClient[] = []; -const createClientInstanceAsync = bluebird.promisify(redis.createInstance); +const redisClients: IRedisClient[] = []; +const createClientInstanceAsync = bluebird.promisify(createRedisClient); export async function startUp(): Promise { const redisClient = await getRedisInstance(); diff --git a/tests/config.ts b/tests/config.ts index 01c76c2..f9530a8 100644 --- a/tests/config.ts +++ b/tests/config.ts @@ -7,11 +7,8 @@ * in the root directory of this source tree. */ -import { - ERedisConfigClient, - ILoggerConfig, - IRedisConfig, -} from '../types/index.js'; +import { ILoggerConfig } from '../src/logger/index.js'; +import { ERedisConfigClient, IRedisConfig } from '../src/redis-client/index.js'; const redisHost = process.env.REDIS_HOST || '127.0.0.1'; const redisPort = Number(process.env.REDIS_PORT) || 6379; diff --git a/tests/event/test00001.test.ts b/tests/event/test00001.test.ts index fe6251c..3aa6749 100644 --- a/tests/event/test00001.test.ts +++ b/tests/event/test00001.test.ts @@ -9,21 +9,18 @@ import { expect, it, jest } from '@jest/globals'; import bluebird from 'bluebird'; -import { EventBusConnectionError } from '../../src/event/errors/index.js'; -import { EventBus } from '../../src/event/index.js'; +import { EventBusNotConnectedError } from '../../src/event-bus/errors/index.js'; +import { EventBus } from '../../src/event-bus/event-bus.js'; type TEvent = { e1: (arg: string) => void; - 'eventBus.disconnect': () => void; + error: (err: Error) => void; }; +const getInstanceAsync = bluebird.promisify(EventBus.createInstance); + it('EventBus: case 1', async () => { - const getInstanceAsync = await bluebird.promisify(EventBus.getInstance); - const eventBusAsync0 = bluebird.promisifyAll( - await getInstanceAsync(), - ); const eventBusAsync = bluebird.promisifyAll(await getInstanceAsync()); - expect(eventBusAsync).toBe(eventBusAsync0); // on const callback = jest.fn(); @@ -70,23 +67,31 @@ it('EventBus: case 1', async () => { eventBusAsync.emit('e1', 'hello7'); expect(callback5).toHaveBeenCalledTimes(1); - await eventBusAsync.disconnectAsync(); - expect(() => eventBusAsync.on('e1', () => void 0)).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.once('e1', () => void 0)).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.removeListener('e1', () => void 0)).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.removeAllListeners('e1')).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.removeAllListeners()).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.emit('e1', 'hello8')).toThrow( - EventBusConnectionError, - ); + await eventBusAsync.shutDownAsync(); + + const errors: Error[] = []; + eventBusAsync.once('error', (e) => errors.push(e)); + + eventBusAsync.on('e1', () => void 0); + expect(errors[0]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.on('error', (e) => errors.push(e)); + + eventBusAsync.once('e1', () => void 0); + expect(errors[1]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeListener('e1', () => void 0); + expect(errors[2]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeAllListeners('e1'); + expect(errors[3]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeAllListeners(); + expect(errors[4]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.emit('e1', 'hello8'); + expect(errors[5]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeListener('error', () => void 0); + eventBusAsync.removeAllListeners('error'); }); diff --git a/tests/event/test00002.test.ts b/tests/event/test00002.test.ts index bd1f4a4..2ec4d9c 100644 --- a/tests/event/test00002.test.ts +++ b/tests/event/test00002.test.ts @@ -9,23 +9,21 @@ import { expect, it, jest } from '@jest/globals'; import bluebird from 'bluebird'; -import { EventBusConnectionError } from '../../src/event/errors/index.js'; -import { EventBusRedis } from '../../src/event/index.js'; +import { EventBusNotConnectedError } from '../../src/event-bus/errors/index.js'; +import { EventBusRedis } from '../../src/event-bus/event-bus-redis.js'; import { redisConfig } from '../config.js'; type TEvent = { e1: (arg: string) => void; + error: (err: Error) => void; }; +const getInstanceAsync = bluebird.promisify(EventBusRedis.createInstance); + it('EventBusRedis: case 1', async () => { - const getInstanceAsync = await bluebird.promisify(EventBusRedis.getInstance); - const eventBusAsync0 = bluebird.promisifyAll( - await getInstanceAsync(redisConfig), - ); const eventBusAsync = bluebird.promisifyAll( await getInstanceAsync(redisConfig), ); - expect(eventBusAsync).toBe(eventBusAsync0); // on const callback = jest.fn(); @@ -80,27 +78,34 @@ it('EventBusRedis: case 1', async () => { eventBusAsync.emit('e1', 'hello7'); expect(callback5).toHaveBeenCalledTimes(1); - await eventBusAsync.disconnectAsync(); + await eventBusAsync.shutDownAsync(); // second time - await eventBusAsync.disconnectAsync(); + await eventBusAsync.shutDownAsync(); - expect(() => eventBusAsync.on('e1', () => void 0)).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.once('e1', () => void 0)).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.removeListener('e1', () => void 0)).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.removeAllListeners('e1')).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.removeAllListeners()).toThrow( - EventBusConnectionError, - ); - expect(() => eventBusAsync.emit('e1', 'hello8')).toThrow( - EventBusConnectionError, - ); + const errors: Error[] = []; + eventBusAsync.once('error', (e) => errors.push(e)); + + eventBusAsync.on('e1', () => void 0); + expect(errors[0]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.on('error', (e) => errors.push(e)); + + eventBusAsync.once('e1', () => void 0); + expect(errors[1]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeListener('e1', () => void 0); + expect(errors[2]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeAllListeners('e1'); + expect(errors[3]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeAllListeners(); + expect(errors[4]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.emit('e1', 'hello8'); + expect(errors[5]).toBeInstanceOf(EventBusNotConnectedError); + + eventBusAsync.removeListener('error', () => void 0); + eventBusAsync.removeAllListeners('error'); }); diff --git a/tests/locker/test00001.test.ts b/tests/locker/test00001.test.ts index 804364a..5cc1679 100644 --- a/tests/locker/test00001.test.ts +++ b/tests/locker/test00001.test.ts @@ -31,13 +31,13 @@ it('Locker: locker(), extend(), releaseLock()', async () => { await expect(lock.extendLockAsync()).rejects.toThrow(LockExtendError); await expect(lock.acquireLockAsync()).resolves.toBe(true); - await expect(lock.extendLockAsync()).resolves.toBeUndefined(); - await expect(lock.releaseLockAsync()).resolves.toBe(true); + await lock.extendLockAsync(); + await lock.releaseLockAsync(); expect(lock.isLocked()).toBe(false); expect(lock.isReleased()).toBe(true); - await expect(lock.releaseLockAsync()).resolves.toBe(false); + await lock.releaseLockAsync(); expect(lock.isReleased()).toBe(true); await expect(lock.extendLockAsync()).rejects.toThrow(LockNotAcquiredError); await expect(lock.acquireLockAsync()).resolves.toBe(true); - await expect(lock.releaseLockAsync()).resolves.toBe(true); + await lock.releaseLockAsync(); }); diff --git a/tests/locker/test00004.test.ts b/tests/locker/test00004.test.ts index ebc251d..fb8b7bb 100644 --- a/tests/locker/test00004.test.ts +++ b/tests/locker/test00004.test.ts @@ -8,14 +8,14 @@ */ import { expect, it, jest } from '@jest/globals'; +import { AbortError } from '../../src/errors/index.js'; import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'path'; import { getDirname } from '../../src/env/environment.js'; -import { LockAbortError } from '../../src/locker/errors/index.js'; import { Locker } from '../../src/locker/locker.js'; import { IoredisClient } from '../../src/redis-client/clients/ioredis-client.js'; -import { ICallback, TFunction } from '../../types/index.js'; +import { ICallback, TFunction } from '../../src/common/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); @@ -96,5 +96,5 @@ it('Locker: extendLock() -> LockAbortError', async () => { await expect(lock.acquireLockAsync()).resolves.toBe(true); await expect( Promise.all([lock.extendLockAsync(), lock.shutdownAsync()]), - ).rejects.toThrow(LockAbortError); + ).rejects.toThrow(AbortError); }); diff --git a/tests/locker/test00005.test.ts b/tests/locker/test00005.test.ts index d925472..370cd5f 100644 --- a/tests/locker/test00005.test.ts +++ b/tests/locker/test00005.test.ts @@ -8,14 +8,14 @@ */ import { expect, it, jest } from '@jest/globals'; +import { AbortError } from '../../src/errors/index.js'; import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'path'; import { getDirname } from '../../src/env/environment.js'; -import { LockAbortError } from '../../src/locker/errors/index.js'; import { Locker } from '../../src/locker/locker.js'; import { IoredisClient } from '../../src/redis-client/clients/ioredis-client.js'; -import { ICallback, TFunction } from '../../types/index.js'; +import { ICallback, TFunction } from '../../src/common/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); @@ -105,5 +105,5 @@ it('Locker: acquireLock() -> LockAbortError', async () => { }, 3000); }), ]), - ).rejects.toThrow(LockAbortError); + ).rejects.toThrow(AbortError); }); diff --git a/tests/mock-module.ts b/tests/mock-module.ts index 661b9c0..f69832b 100644 --- a/tests/mock-module.ts +++ b/tests/mock-module.ts @@ -8,7 +8,7 @@ */ import { jest } from '@jest/globals'; -import { TFunction } from '../types/common.js'; +import { TFunction } from '../src/common/index.js'; export function mockModule(moduleName: string, mockFactory: TFunction) { if (process.env['NODE_OPTIONS']?.includes('--experimental-vm-modules')) { diff --git a/tests/redis-client/common.ts b/tests/redis-client/common.ts index e8aff0a..0ec6bb1 100644 --- a/tests/redis-client/common.ts +++ b/tests/redis-client/common.ts @@ -9,8 +9,10 @@ import { expect } from '@jest/globals'; import bluebird from 'bluebird'; -import { RedisClient } from '../../src/redis-client/redis-client.js'; -import { IRedisConfig } from '../../types/index.js'; +import { + IRedisConfig, + RedisClientAbstract, +} from '../../src/redis-client/index.js'; import { getRedisInstance } from '../common.js'; export async function standardCommands(config: IRedisConfig) { @@ -182,13 +184,13 @@ export async function standardCommands(config: IRedisConfig) { ); } - await client.quitAsync(); // does exec quit command - await client.quitAsync(); // does not exec quit + await client.shutDownAsync(); // does exec quit command + await client.shutDownAsync(); // does not exec quit } export async function scriptRunning(config: IRedisConfig) { const client = await getRedisInstance(config); - RedisClient.addScript('test_script', 'return 1'); + RedisClientAbstract.addScript('test_script', 'return 1'); await client.loadScriptsAsync(); const r = await client.runScriptAsync('test_script', [], []); expect(r).toBe(1); diff --git a/tests/redis-client/test00001.test.ts b/tests/redis-client/test00001.test.ts index ff0b5df..6908261 100644 --- a/tests/redis-client/test00001.test.ts +++ b/tests/redis-client/test00001.test.ts @@ -8,7 +8,7 @@ */ import { it } from '@jest/globals'; -import { ERedisConfigClient } from '../../types/index.js'; +import { ERedisConfigClient } from '../../src/redis-client/index.js'; import { pubSubChannel, pubSubPattern, diff --git a/tests/redis-client/test00003.test.ts b/tests/redis-client/test00003.test.ts index 7e24643..3533a1b 100644 --- a/tests/redis-client/test00003.test.ts +++ b/tests/redis-client/test00003.test.ts @@ -8,7 +8,7 @@ */ import { it } from '@jest/globals'; -import { ERedisConfigClient } from '../../types/index.js'; +import { ERedisConfigClient } from '../../src/redis-client/index.js'; import { pubSubChannel, pubSubPattern, diff --git a/tests/worker/test00001.test.ts b/tests/worker/test00001.test.ts index 45608c0..23f5f31 100644 --- a/tests/worker/test00001.test.ts +++ b/tests/worker/test00001.test.ts @@ -28,8 +28,8 @@ it('WorkerCallable: case 1', async () => { WorkerPayloadRequiredError, ); - await worker.quitAsync(); + await worker.shutDownAsync(); // second timer is OK - await worker.quitAsync(); + await worker.shutDownAsync(); }); diff --git a/tests/worker/test00002.test.ts b/tests/worker/test00002.test.ts index d587a30..52c6b6b 100644 --- a/tests/worker/test00002.test.ts +++ b/tests/worker/test00002.test.ts @@ -25,6 +25,8 @@ it('WorkerCallable: case 2', async () => { 'Error code: FILE_READ_ERROR', ); + await bluebird.delay(1000); + await expect(worker.callAsync('Hello world!')).rejects.toThrow( 'Error code: FILE_READ_ERROR', ); @@ -44,7 +46,7 @@ it('WorkerCallable: case 2', async () => { await expect(worker3.callAsync('Hello world!')).rejects.toThrow( 'Error code: PROCESSING_ERROR', ); - await worker3.quitAsync(); + await worker3.shutDownAsync(); const filename4 = resolve(dir, './workers/worker-exception.worker.js'); const worker4 = bluebird.promisifyAll( @@ -53,7 +55,7 @@ it('WorkerCallable: case 2', async () => { await expect(worker4.callAsync('Hello world!')).rejects.toThrow( 'Error code: PROCESSING_CAUGHT_ERROR', ); - await worker4.quitAsync(); + await worker4.shutDownAsync(); const filename5 = resolve(dir, './workers/worker-faulty.worker.js'); const worker5 = bluebird.promisifyAll( diff --git a/tests/worker/test00003.test.ts b/tests/worker/test00003.test.ts index eb0da5a..7859fd8 100644 --- a/tests/worker/test00003.test.ts +++ b/tests/worker/test00003.test.ts @@ -12,7 +12,10 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExecutionCode, EWorkerType } from '../../types/index.js'; +import { + EWorkerThreadExecutionCode, + EWorkerType, +} from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00004.test.ts b/tests/worker/test00004.test.ts index 4a07627..98d78fe 100644 --- a/tests/worker/test00004.test.ts +++ b/tests/worker/test00004.test.ts @@ -12,7 +12,10 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExecutionCode, EWorkerType } from '../../types/index.js'; +import { + EWorkerThreadExecutionCode, + EWorkerType, +} from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00005.test.ts b/tests/worker/test00005.test.ts index b30a645..f02110e 100644 --- a/tests/worker/test00005.test.ts +++ b/tests/worker/test00005.test.ts @@ -12,7 +12,10 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExecutionCode, EWorkerType } from '../../types/index.js'; +import { + EWorkerThreadExecutionCode, + EWorkerType, +} from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00006.test.ts b/tests/worker/test00006.test.ts index eabff68..af98fe1 100644 --- a/tests/worker/test00006.test.ts +++ b/tests/worker/test00006.test.ts @@ -12,7 +12,7 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExitCode, EWorkerType } from '../../types/index.js'; +import { EWorkerThreadExitCode, EWorkerType } from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00007.test.ts b/tests/worker/test00007.test.ts index 0ed715c..51180c8 100644 --- a/tests/worker/test00007.test.ts +++ b/tests/worker/test00007.test.ts @@ -12,7 +12,7 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExitCode, EWorkerType } from '../../types/index.js'; +import { EWorkerThreadExitCode, EWorkerType } from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00008.test.ts b/tests/worker/test00008.test.ts index 13980bc..091b901 100644 --- a/tests/worker/test00008.test.ts +++ b/tests/worker/test00008.test.ts @@ -10,7 +10,7 @@ import { expect, it, jest } from '@jest/globals'; import bluebird from 'bluebird'; import { EventEmitter } from 'events'; -import { EWorkerThreadExitCode } from '../../types/index.js'; +import { EWorkerThreadExitCode } from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; it('WorkerCallable: case 8', async () => { diff --git a/tests/worker/test00009.test.ts b/tests/worker/test00009.test.ts index 7fa73e7..2a095e1 100644 --- a/tests/worker/test00009.test.ts +++ b/tests/worker/test00009.test.ts @@ -12,7 +12,7 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExitCode, EWorkerType } from '../../types/index.js'; +import { EWorkerThreadExitCode, EWorkerType } from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00010.test.ts b/tests/worker/test00010.test.ts index 93f8232..7b71a4e 100644 --- a/tests/worker/test00010.test.ts +++ b/tests/worker/test00010.test.ts @@ -12,7 +12,7 @@ import bluebird from 'bluebird'; import { EventEmitter } from 'events'; import { resolve } from 'node:path'; import { getDirname } from '../../src/env/environment.js'; -import { EWorkerThreadExitCode, EWorkerType } from '../../types/index.js'; +import { EWorkerThreadExitCode, EWorkerType } from '../../src/worker/index.js'; import { mockModule } from '../mock-module.js'; const dir = getDirname(); diff --git a/tests/worker/test00011.test.ts b/tests/worker/test00011.test.ts index 59d149a..03d4db3 100644 --- a/tests/worker/test00011.test.ts +++ b/tests/worker/test00011.test.ts @@ -20,7 +20,7 @@ import { WorkerRunnable } from '../../src/worker/worker-runnable.js'; const dir = getDirname(); it('WorkerRunnable', async () => { - const filename = resolve(dir, './workers/worker-runnable-ok.worker.js'); + const filename = resolve(dir, './workers/runnable/runnable1.worker.js'); const worker = bluebird.promisifyAll(new WorkerRunnable(filename)); // will emit an error upon shutdown worker.on('worker.error', (err) => { @@ -32,9 +32,9 @@ it('WorkerRunnable', async () => { WorkerAlreadyRunningError, ); - await worker.quitAsync(); + await worker.shutDownAsync(); - await expect(async () => worker.quitAsync()).rejects.toThrow( + await expect(async () => worker.shutDownAsync()).rejects.toThrow( WorkerAlreadyDownError, ); }); diff --git a/tests/worker/test00012.test.ts b/tests/worker/test00012.test.ts index 29a4084..2abfcf0 100644 --- a/tests/worker/test00012.test.ts +++ b/tests/worker/test00012.test.ts @@ -22,7 +22,7 @@ it('WorkerResourceGroup: addWorker()', async () => { new WorkerResourceGroup(redisClient, console, 'mygroupid'), ); - const filename = resolve(dir, './workers/worker-runnable-ok.worker.js'); + const filename = resolve(dir, './workers/runnable/runnable1.worker.js'); workerRunnableResourceGroup.addWorker(filename, 'hello world'); await workerRunnableResourceGroup.runAsync(); diff --git a/tests/worker/test00013.test.ts b/tests/worker/test00013.test.ts index 9dfdf1b..8991cc2 100644 --- a/tests/worker/test00013.test.ts +++ b/tests/worker/test00013.test.ts @@ -22,7 +22,7 @@ it('WorkerResourceGroup: loadFromDir()', async () => { new WorkerResourceGroup(redisClient, console, 'mygroupid'), ); - const workersPath = resolve(dir, './workers'); + const workersPath = resolve(dir, './workers/runnable'); await workerRunnableResourceGroup.loadFromDirAsync( workersPath, 'hello world', diff --git a/tests/worker/workers/worker-runnable-ok.worker.ts b/tests/worker/workers/runnable/runnable1.worker.ts similarity index 74% rename from tests/worker/workers/worker-runnable-ok.worker.ts rename to tests/worker/workers/runnable/runnable1.worker.ts index edcc1cc..7b673ba 100644 --- a/tests/worker/workers/worker-runnable-ok.worker.ts +++ b/tests/worker/workers/runnable/runnable1.worker.ts @@ -7,8 +7,9 @@ * in the root directory of this source tree. */ -import { ICallback } from '../../../types/index.js'; +import { ICallback } from '../../../../src/common/index.js'; export default function myWorkerRunnable(msg: string, cb: ICallback) { - setTimeout(() => cb(), 5000); + setInterval(() => void 0, 1000); + cb(); } diff --git a/tests/worker/workers/runnable/runnable2.worker.ts b/tests/worker/workers/runnable/runnable2.worker.ts new file mode 100644 index 0000000..7b673ba --- /dev/null +++ b/tests/worker/workers/runnable/runnable2.worker.ts @@ -0,0 +1,15 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback } from '../../../../src/common/index.js'; + +export default function myWorkerRunnable(msg: string, cb: ICallback) { + setInterval(() => void 0, 1000); + cb(); +} diff --git a/tests/worker/workers/worker-error.worker.ts b/tests/worker/workers/worker-error.worker.ts index 9c4fc4b..f20b867 100644 --- a/tests/worker/workers/worker-error.worker.ts +++ b/tests/worker/workers/worker-error.worker.ts @@ -7,7 +7,7 @@ * in the root directory of this source tree. */ -import { ICallback } from '../../../types/index.js'; +import { ICallback } from '../../../src/common/index.js'; export default function myWorkerCallable(msg: string, cb: ICallback) { cb(new Error('MY_ERROR')); diff --git a/tests/worker/workers/worker-ok.worker.ts b/tests/worker/workers/worker-ok.worker.ts index a087f9d..ecea1cd 100644 --- a/tests/worker/workers/worker-ok.worker.ts +++ b/tests/worker/workers/worker-ok.worker.ts @@ -7,7 +7,7 @@ * in the root directory of this source tree. */ -import { ICallback } from '../../../types/index.js'; +import { ICallback } from '../../../src/common/index.js'; export default function myWorkerCallable( payload: string, diff --git a/types/event/event-bus.ts b/types/event/event-bus.ts deleted file mode 100644 index 929533d..0000000 --- a/types/event/event-bus.ts +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { ICallback } from '../common.js'; -import { IEventEmitter, TEventEmitterEvent } from './event-emitter.js'; - -export interface IEventBus - extends IEventEmitter { - disconnect(cb: ICallback): void; -}