Skip to content

Commit

Permalink
feat(events)!: use typed events, remove legacy events
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Nov 26, 2023
1 parent 8e3c228 commit 5f5a34f
Show file tree
Hide file tree
Showing 18 changed files with 145 additions and 106 deletions.
1 change: 0 additions & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export * from './src/lib/queue/errors';
export * from './src/lib/exchange/errors';
export * from './src/lib/producer/errors';
export * from './src/lib/message/errors';
export { events } from './src/common/events/events';
export { Consumer } from './src/lib/consumer/consumer';
export { Producer } from './src/lib/producer/producer';
export { Message } from './src/lib/message/message';
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"jest": "27.2.4",
"lint-staged": "11.1.2",
"prettier": "3.0.3",
"redis-smq-common": "3.0.0-rc.6",
"redis-smq-common": "3.0.0-rc.7",
"supertest": "6.1.6",
"type-coverage": "2.27.0",
"typescript": "4.9.4"
Expand Down
22 changes: 0 additions & 22 deletions src/common/events/events.ts

This file was deleted.

26 changes: 13 additions & 13 deletions src/lib/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
*/

import { v4 as uuid } from 'uuid';
import { EventEmitter } from 'events';
import { IEventListener } from '../../types';
import { events } from '../common/events/events';
import {
async,
redis,
Expand All @@ -23,10 +21,12 @@ import {
ILogger,
CallbackEmptyReplyError,
PanicError,
EventEmitter,
} from 'redis-smq-common';
import { Configuration } from '../config/configuration';
import { TRedisSMQEvent } from '../../types';

export abstract class Base extends EventEmitter {
export abstract class Base extends EventEmitter<TRedisSMQEvent> {
protected readonly id: string;
protected readonly powerSwitch: PowerSwitch;
protected sharedRedisClient: RedisClient | null = null;
Expand Down Expand Up @@ -65,11 +65,11 @@ export abstract class Base extends EventEmitter {
};

protected registerSystemEventListeners(): void {
this.on(events.GOING_UP, () => this.logger.info(`Going up...`));
this.on(events.UP, () => this.logger.info(`Up and running...`));
this.on(events.GOING_DOWN, () => this.logger.info(`Going down...`));
this.on(events.DOWN, () => this.logger.info(`Down.`));
this.on(events.ERROR, (err: Error) => this.handleError(err));
this.on('goingUp', () => this.logger.info(`Going up...`));
this.on('up', () => this.logger.info(`Up and running...`));
this.on('goingDown', () => this.logger.info(`Going down...`));
this.on('down', () => this.logger.info(`Down.`));
this.on('error', (err: Error) => this.handleError(err));
}

protected goingUp(): TFunction[] {
Expand All @@ -78,7 +78,7 @@ export abstract class Base extends EventEmitter {

protected up(cb?: ICallback<boolean>): void {
this.powerSwitch.commit();
this.emit(events.UP);
this.emit('up');
cb && cb(null, true);
}

Expand All @@ -88,7 +88,7 @@ export abstract class Base extends EventEmitter {

protected down(cb?: ICallback<boolean>): void {
this.powerSwitch.commit();
this.emit(events.DOWN);
this.emit('down');
cb && cb(null, true);
}

Expand Down Expand Up @@ -147,12 +147,12 @@ export abstract class Base extends EventEmitter {
run(cb?: ICallback<boolean>): void {
const r = this.powerSwitch.goingUp();
if (r) {
this.emit(events.GOING_UP);
this.emit('goingUp');
const tasks = this.goingUp();
async.waterfall(tasks, (err) => {
if (err) {
if (cb) cb(err);
else this.emit(events.ERROR, err);
else this.emit('error', err);
} else this.up(cb);
});
} else {
Expand All @@ -163,7 +163,7 @@ export abstract class Base extends EventEmitter {
shutdown(cb?: ICallback<boolean>): void {
const r = this.powerSwitch.goingDown();
if (r) {
this.emit(events.GOING_DOWN);
this.emit('goingDown');
const tasks = this.goingDown();
async.waterfall(tasks, () => {
// ignoring shutdown errors
Expand Down
12 changes: 6 additions & 6 deletions src/lib/consumer/consumer-heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import {
ICallback,
IRedisTransaction,
CallbackInvalidReplyError,
EventEmitter,
} from 'redis-smq-common';
import { events } from '../../common/events/events';
import { redisKeys } from '../../common/redis-keys/redis-keys';
import { EventEmitter } from 'events';
import { Consumer } from './consumer';
import { TRedisSMQEvent } from '../../../types';

const cpuUsageStatsRef = {
cpuUsage: process.cpuUsage(),
Expand Down Expand Up @@ -55,7 +55,7 @@ function cpuUsage() {
};
}

export class ConsumerHeartbeat extends EventEmitter {
export class ConsumerHeartbeat extends EventEmitter<TRedisSMQEvent> {
protected static readonly heartbeatTTL = 10 * 1000; // 10 sec
protected redisClient: RedisClient;
protected ticker: Ticker;
Expand Down Expand Up @@ -103,10 +103,10 @@ export class ConsumerHeartbeat extends EventEmitter {
multi.hset(this.keyHeartbeats, this.consumer.getId(), heartbeatPayloadStr);
multi.zadd(this.keyHeartbeatTimestamps, timestamp, this.consumer.getId());
multi.exec((err) => {
if (err) this.emit(events.ERROR, err);
if (err) this.emit('error', err);
else {
this.emit(
events.TICK,
'heartbeatTick',
timestamp,
this.consumer.getId(),
heartbeatPayload,
Expand All @@ -120,7 +120,7 @@ export class ConsumerHeartbeat extends EventEmitter {
async.waterfall(
[
(cb: ICallback<void>) => {
this.ticker.once(events.DOWN, cb);
this.ticker.once('down', cb);
this.ticker.quit();
},
(cb: ICallback<void>) => {
Expand Down
13 changes: 4 additions & 9 deletions src/lib/consumer/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
TConsumerRedisKeys,
IQueueParams,
} from '../../../types';
import { events } from '../../common/events/events';
import { redisKeys } from '../../common/redis-keys/redis-keys';
import { ConsumerHeartbeat } from './consumer-heartbeat';
import { Base } from '../base';
Expand Down Expand Up @@ -64,10 +63,8 @@ export class Consumer extends Base {
this,
this.redisKeys,
);
this.heartbeat.on(events.ERROR, (err: Error) =>
this.emit(events.ERROR, err),
);
this.heartbeat.once(events.TICK, () => cb());
this.heartbeat.on('error', (err) => this.emit('error', err));
this.heartbeat.once('heartbeatTick', () => cb());
}
},
);
Expand Down Expand Up @@ -95,10 +92,8 @@ export class Consumer extends Base {
new WorkerPool(),
nsLogger,
);
this.workerRunner.on(events.ERROR, (err: Error) =>
this.emit(events.ERROR, err),
);
this.workerRunner.once(events.UP, cb);
this.workerRunner.on('error', (...args) => this.emit('error', ...args));
this.workerRunner.once('up', cb);
this.workerRunner.addWorker(
new DelayUnacknowledgedWorker(redisClient, true),
);
Expand Down
11 changes: 5 additions & 6 deletions src/lib/consumer/message-handler/consume-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
EMessageProperty,
EMessagePropertyStatus,
} from '../../../../types';
import { events } from '../../../common/events/events';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { MessageHandler } from './message-handler';
import {
Expand Down Expand Up @@ -90,7 +89,7 @@ export class ConsumeMessage {
const messageHandlerId = this.messageHandler.getId();
const consumerId = this.messageHandler.getConsumerId();
this.messageHandler.emit(
events.MESSAGE_UNACKNOWLEDGED,
'messageUnacknowledged',
cause,
messageId,
queue,
Expand All @@ -99,7 +98,7 @@ export class ConsumeMessage {
);
if (reply.action === ERetryAction.DEAD_LETTER) {
this.messageHandler.emit(
events.MESSAGE_DEAD_LETTERED,
'messageDeadLettered',
reply.deadLetterCause,
messageId,
queue,
Expand All @@ -108,15 +107,15 @@ export class ConsumeMessage {
);
} else if (reply.action === ERetryAction.DELAY) {
this.messageHandler.emit(
events.MESSAGE_DELAYED,
'messageDelayed',
messageId,
queue,
messageHandlerId,
consumerId,
);
} else {
this.messageHandler.emit(
events.MESSAGE_REQUEUED,
'messageRequeued',
messageId,
queue,
messageHandlerId,
Expand Down Expand Up @@ -157,7 +156,7 @@ export class ConsumeMessage {
if (err) this.messageHandler.handleError(err);
else
this.messageHandler.emit(
events.MESSAGE_ACKNOWLEDGED,
'messageAcknowledged',
msg.getRequiredId(),
msg.getDestinationQueue(),
this.messageHandler.getId(),
Expand Down
7 changes: 3 additions & 4 deletions src/lib/consumer/message-handler/dequeue-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
RedisClient,
Ticker,
} from 'redis-smq-common';
import { events } from '../../../common/events/events';
import { MessageHandler } from './message-handler';
import { QueueRateLimit } from '../../queue/queue-rate-limit';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
Expand Down Expand Up @@ -69,7 +68,7 @@ export class DequeueMessage {
this.consumerId,
);
this.ticker = new Ticker(() => {
this.messageHandler.emit(events.MESSAGE_NEXT);
this.messageHandler.emit('next');
});
}

Expand All @@ -79,7 +78,7 @@ export class DequeueMessage {
this.messageHandler.handleError(err);
} else if (typeof messageId === 'string') {
this.messageHandler.emit(
events.MESSAGE_RECEIVED,
'messageReceived',
messageId,
this.queue,
this.consumerId,
Expand Down Expand Up @@ -212,7 +211,7 @@ export class DequeueMessage {
}

quit(cb: ICallback<void>): void {
this.ticker.once(events.DOWN, cb);
this.ticker.once('down', cb);
this.ticker.quit();
}
}
21 changes: 10 additions & 11 deletions src/lib/consumer/message-handler/message-handler-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import { Consumer } from '../consumer';
import { MessageHandler } from './message-handler';
import { events } from '../../../common/events/events';
import {
EConsumeMessageUnacknowledgedCause,
TConsumerMessageHandler,
Expand Down Expand Up @@ -41,20 +40,20 @@ export class MessageHandlerRunner {
}

protected registerMessageHandlerEvents(messageHandler: MessageHandler): void {
messageHandler.on(events.ERROR, (...args: unknown[]) =>
this.consumer.emit(events.ERROR, ...args),
messageHandler.on('error', (...args) =>
this.consumer.emit('error', ...args),
);
messageHandler.on(events.MESSAGE_UNACKNOWLEDGED, (...args: unknown[]) =>
this.consumer.emit(events.MESSAGE_UNACKNOWLEDGED, ...args),
messageHandler.on('messageUnacknowledged', (...args) =>
this.consumer.emit('messageUnacknowledged', ...args),
);
messageHandler.on(events.MESSAGE_DEAD_LETTERED, (...args: unknown[]) =>
this.consumer.emit(events.MESSAGE_DEAD_LETTERED, ...args),
messageHandler.on('messageDeadLettered', (...args) =>
this.consumer.emit('messageDeadLettered', ...args),
);
messageHandler.on(events.MESSAGE_ACKNOWLEDGED, (...args: unknown[]) =>
this.consumer.emit(events.MESSAGE_ACKNOWLEDGED, ...args),
messageHandler.on('messageAcknowledged', (...args) =>
this.consumer.emit('messageAcknowledged', ...args),
);
messageHandler.on(events.MESSAGE_RECEIVED, (...args: unknown[]) =>
this.consumer.emit(events.MESSAGE_RECEIVED, ...args),
messageHandler.on('messageReceived', (...args) =>
this.consumer.emit('messageReceived', ...args),
);
}

Expand Down

0 comments on commit 5f5a34f

Please sign in to comment.