Skip to content

Commit

Permalink
feat!: add ProducibleMessage/ConsumableMessage/MessageEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Dec 17, 2023
1 parent e6ae499 commit 4a3eec7
Show file tree
Hide file tree
Showing 24 changed files with 683 additions and 501 deletions.
2 changes: 1 addition & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export * from './src/lib/message/errors';
export { Consumer } from './src/lib/consumer/consumer';
export { Producer } from './src/lib/producer/producer';
export { Message } from './src/lib/message/message';
export { MessageEnvelope } from './src/lib/message/message-envelope';
export { ProducibleMessage } from './src/lib/message/producible-message';
export { ExchangeDirect } from './src/lib/exchange/exchange-direct';
export { ExchangeTopic } from './src/lib/exchange/exchange-topic';
export { ExchangeFanOut } from './src/lib/exchange/exchange-fan-out';
Expand Down
18 changes: 6 additions & 12 deletions src/lib/consumer/message-handler/consume-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import {
import { processingQueue } from './processing-queue';
import { ERetryAction } from './retry-message';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
import { _fromMessage } from '../../message/_from-message';
import { Configuration } from '../../../config/configuration';
import { _createRMessage } from '../../message/_create-r-message';

export class ConsumeMessage {
protected keyQueueProcessing: string;
Expand All @@ -52,7 +52,7 @@ export class ConsumeMessage {
message: MessageEnvelope,
cb: ICallback<void>,
): void {
const messageId = message.getRequiredId();
const messageId = message.getId();
const queue = message.getDestinationQueue();
const { keyQueueAcknowledged } = redisKeys.getQueueKeys(queue);
const { store, queueSize, expire } =
Expand Down Expand Up @@ -87,7 +87,7 @@ export class ConsumeMessage {
else if (!reply)
this.messageHandler.handleError(new CallbackEmptyReplyError());
else {
const messageId = msg.getRequiredId();
const messageId = msg.getId();
const queue = msg.getDestinationQueue();
const messageHandlerId = this.messageHandler.getId();
const consumerId = this.messageHandler.getConsumerId();
Expand Down Expand Up @@ -134,7 +134,7 @@ export class ConsumeMessage {
let isTimeout = false;
let timer: NodeJS.Timeout | null = null;
try {
const consumeTimeout = msg.getConsumeTimeout();
const consumeTimeout = msg.producibleMessage.getConsumeTimeout();
if (consumeTimeout) {
timer = setTimeout(() => {
isTimeout = true;
Expand All @@ -160,7 +160,7 @@ export class ConsumeMessage {
else
this.messageHandler.emit(
'messageAcknowledged',
msg.getRequiredId(),
msg.getId(),
msg.getDestinationQueue(),
this.messageHandler.getId(),
this.messageHandler.getConsumerId(),
Expand All @@ -169,13 +169,7 @@ export class ConsumeMessage {
}
}
};

// As a safety measure, in case if we mess with message system
// properties, only a clone of the message is actually given
this.messageHandler.getHandler()(
_fromMessage(msg, msg.getStatus(), msg.getMessageState()),
onConsumed,
);
this.messageHandler.getHandler()(_createRMessage(msg), onConsumed);
} catch (error: unknown) {
this.logger.error(error);
this.unacknowledgeMessage(
Expand Down
2 changes: 1 addition & 1 deletion src/lib/consumer/message-handler/processing-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export const processingQueue = {
if (err) done(err);
else {
if (message) {
const messageId = message.getRequiredId();
const messageId = message.getId();
args.push(messageId);
const { keyMessage } =
redisKeys.getMessageKeys(messageId);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/consumer/message-handler/retry-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export function getRetryAction(
deadLetterCause: EConsumeMessageDeadLetterCause.RETRY_THRESHOLD_EXCEEDED,
};
}
const delay = message.getRetryDelay();
const delay = message.producibleMessage.getRetryDelay();
if (delay) {
return {
action: ERetryAction.DELAY,
Expand Down
115 changes: 115 additions & 0 deletions src/lib/message/_create-r-message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { MessageEnvelope } from './message-envelope';
import {
EMessagePriority,
EMessagePropertyStatus,
IMessageSerialized,
IConsumableMessage,
IQueueParams,
TExchange,
TTopicParams,
} from '../../../types';

export function _createRMessage(msg: MessageEnvelope): IConsumableMessage {
return {
getPublishedAt(): number | null {
return msg.getPublishedAt();
},

getScheduledAt(): number | null {
return msg.getScheduledAt();
},

getScheduledMessageId(): string | null {
return msg.getScheduledMessageId();
},

getId(): string {
return msg.getId();
},

getStatus(): EMessagePropertyStatus {
return msg.getStatus();
},

hasPriority(): boolean {
return msg.producibleMessage.hasPriority();
},

getQueue(): IQueueParams | string | null {
return msg.producibleMessage.getQueue();
},

getDestinationQueue(): IQueueParams {
return msg.getDestinationQueue();
},

getPriority(): EMessagePriority | null {
return msg.producibleMessage.getPriority();
},

getBody(): unknown {
return msg.producibleMessage.getBody();
},

getTTL(): number {
return msg.producibleMessage.getTTL();
},

getRetryThreshold(): number {
return msg.producibleMessage.getRetryThreshold();
},

getRetryDelay(): number {
return msg.producibleMessage.getRetryDelay();
},

getConsumeTimeout(): number {
return msg.producibleMessage.getConsumeTimeout();
},

getCreatedAt(): number {
return msg.producibleMessage.getCreatedAt();
},

getScheduledRepeat(): number {
return msg.producibleMessage.getScheduledRepeat();
},

getScheduledRepeatPeriod(): number | null {
return msg.producibleMessage.getScheduledRepeatPeriod();
},

getScheduledCRON(): string | null {
return msg.producibleMessage.getScheduledCRON();
},

getScheduledDelay(): number | null {
return msg.producibleMessage.getScheduledDelay();
},

getFanOut(): string | null {
return msg.producibleMessage.getFanOut();
},

getTopic(): TTopicParams | string | null {
return msg.producibleMessage.getTopic();
},

toJSON(): IMessageSerialized {
return msg.toJSON();
},

getExchange(): TExchange {
return msg.getExchange();
},
};
}
2 changes: 1 addition & 1 deletion src/lib/message/_delete-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
} from '../../../types';
import { ELuaScriptName } from '../../common/redis-client/redis-client';
import { _getMessage } from './_get-message';
import { MessageDeleteError } from './errors/message-delete.error';
import { MessageDeleteError } from './errors';

export function _deleteMessage(
redisClient: RedisClient,
Expand Down
16 changes: 9 additions & 7 deletions src/lib/message/_from-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ import {
} from '../../../types';
import { MessageEnvelope } from './message-envelope';
import { _fromJSON } from '../exchange/_from-json';
import { ProducibleMessage } from './producible-message';

export function _fromMessage(
msg: string | MessageEnvelope,
status: EMessagePropertyStatus | null,
msgState: string | MessageState | null,
): MessageEnvelope {
const { exchange, ...params }: IMessageSerialized =
const { exchange, destinationQueue, ...params }: IMessageSerialized =
typeof msg === 'string' ? JSON.parse(msg) : msg.toJSON();

// Properties
const m = new MessageEnvelope();
Object.assign(m, params);
const messagePub = new ProducibleMessage();
Object.assign(messagePub, params);
messagePub.setExchange(_fromJSON(exchange));

//
const m = new MessageEnvelope(messagePub);
m.setDestinationQueue(destinationQueue);

// Status
if (status !== null) {
Expand All @@ -42,8 +47,5 @@ export function _fromMessage(
m.setMessageState(messageStateInstance);
}

// Exchange
if (exchange) m.setExchange(_fromJSON(exchange));

return m;
}
26 changes: 26 additions & 0 deletions src/lib/message/_get-message-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { ICallback, RedisClient } from 'redis-smq-common';
import { redisKeys } from '../../common/redis-keys/redis-keys';
import { EMessageProperty, IMessageStateSerialized } from '../../../types';
import { MessageNotFoundError } from './errors';

export function _getMessageState(
redisClient: RedisClient,
messageId: string,
cb: ICallback<IMessageStateSerialized>,
): void {
const { keyMessage } = redisKeys.getMessageKeys(messageId);
redisClient.hget(keyMessage, String(EMessageProperty.STATE), (err, reply) => {
if (err) cb(err);
else if (!reply) cb(new MessageNotFoundError());
else cb(null, JSON.parse(reply));
});
}

0 comments on commit 4a3eec7

Please sign in to comment.