Skip to content

Commit

Permalink
refactor(QueueMessages)!: move message methods to Message,add Message…
Browse files Browse the repository at this point in the history
…Envelope
  • Loading branch information
weyoss committed Dec 10, 2023
1 parent 1dbf28f commit f71f0d0
Show file tree
Hide file tree
Showing 25 changed files with 677 additions and 624 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Considerations:

## Usage

RedisSMQ provides 3 classes in order to work with the message queue: `Message`, `Producer`, and `Consumer`.
RedisSMQ provides 3 classes in order to work with the message queue: `MessageEnvelope`, `Producer`, and `Consumer`.

Producers and consumers exchange data using one or multiple queues that may be created using the [Queue Class](docs/api/classes/Queue.md).

Expand All @@ -76,11 +76,11 @@ queue.save('my_queue', EQueueType.LIFO_QUEUE, (err) => console.log(err));
### Producing a message

```javascript
const { Producer, Message } = require('redis-smq');
const { Producer, MessageEnvelope } = require('redis-smq');

const producer = new Producer();

const message = new Message();
const message = new MessageEnvelope();
message.setQueue('my_queue').setBody('Hello Word!')

producer.produce(message, (err) => console.log(err));
Expand Down
1 change: 1 addition & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './src/lib/message/errors';
export { Consumer } from './src/lib/consumer/consumer';
export { Producer } from './src/lib/producer/producer';
export { Message } from './src/lib/message/message';
export { MessageEnvelope } from './src/lib/message/message-envelope';
export { ExchangeDirect } from './src/lib/exchange/exchange-direct';
export { ExchangeTopic } from './src/lib/exchange/exchange-topic';
export { ExchangeFanOut } from './src/lib/exchange/exchange-fan-out';
Expand Down
6 changes: 4 additions & 2 deletions misc/health-check/combined-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { async, ICallback } from 'redis-smq-common';
import {
Producer,
Consumer,
Message,
MessageEnvelope,
Queue,
disconnect,
EQueueType,
Expand All @@ -24,7 +24,9 @@ const produceForever = (err?: Error | null) => {
if (err) console.log(err);
else {
if (producer.isGoingUp() || producer.isRunning()) {
const message = new Message().setBody('some data').setQueue(queueName);
const message = new MessageEnvelope()
.setBody('some data')
.setQueue(queueName);
producer.produce(message, produceForever);
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/lib/consumer/message-handler/consume-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../message/message';
import { MessageEnvelope } from '../../message/message-envelope';
import {
EConsumeMessageUnacknowledgedCause,
EMessageProperty,
Expand Down Expand Up @@ -48,7 +48,10 @@ export class ConsumeMessage {
this.logger = logger;
}

protected acknowledgeMessage(message: Message, cb: ICallback<void>): void {
protected acknowledgeMessage(
message: MessageEnvelope,
cb: ICallback<void>,
): void {
const messageId = message.getRequiredId();
const queue = message.getDestinationQueue();
const { keyQueueAcknowledged } = redisKeys.getQueueKeys(queue);
Expand All @@ -70,7 +73,7 @@ export class ConsumeMessage {
}

protected unacknowledgeMessage(
msg: Message,
msg: MessageEnvelope,
cause: EConsumeMessageUnacknowledgedCause,
): void {
processingQueue.handleProcessingQueue(
Expand Down Expand Up @@ -127,7 +130,7 @@ export class ConsumeMessage {
);
}

protected consumeMessage(msg: Message): void {
protected consumeMessage(msg: MessageEnvelope): void {
let isTimeout = false;
let timer: NodeJS.Timeout | null = null;
try {
Expand Down Expand Up @@ -182,7 +185,7 @@ export class ConsumeMessage {
}
}

handleReceivedMessage(message: Message): void {
handleReceivedMessage(message: MessageEnvelope): void {
if (message.getSetExpired()) {
this.unacknowledgeMessage(
message,
Expand Down
6 changes: 3 additions & 3 deletions src/lib/consumer/message-handler/processing-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
EConsumeMessageUnacknowledgedCause,
IQueueParams,
} from '../../../../types';
import { Message } from '../../message/message';
import { MessageEnvelope } from '../../message/message-envelope';
import {
async,
RedisClient,
Expand All @@ -29,7 +29,7 @@ import {
} from './retry-message';
import { consumerQueues } from '../consumer-queues';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
import { _getMessage } from '../../queue/queue-messages/_get-message';
import { _getMessage } from '../../message/_get-message';
import { Configuration } from '../../../config/configuration';
import { ConsumerError } from '../errors';

Expand Down Expand Up @@ -203,7 +203,7 @@ export const processingQueue = {
fetchProcessingQueueMessage(
redisClient: RedisClient,
keyQueueProcessing: string,
cb: ICallback<Message>,
cb: ICallback<MessageEnvelope>,
): void {
redisClient.lrange(
keyQueueProcessing,
Expand Down
4 changes: 2 additions & 2 deletions src/lib/consumer/message-handler/retry-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../message/message';
import { MessageEnvelope } from '../../message/message-envelope';
import {
EConsumeMessageDeadLetterCause,
EConsumeMessageUnacknowledgedCause,
Expand All @@ -29,7 +29,7 @@ export type TGetRetryActionReply =
};

export function getRetryAction(
message: Message,
message: MessageEnvelope,
unacknowledgedCause: EConsumeMessageUnacknowledgedCause,
): TGetRetryActionReply {
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import {
ICallback,
CallbackEmptyReplyError,
} from 'redis-smq-common';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { redisKeys } from '../../common/redis-keys/redis-keys';
import {
EMessageProperty,
EMessagePropertyStatus,
EQueueProperty,
EQueueType,
} from '../../../../types';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
} from '../../../types';
import { ELuaScriptName } from '../../common/redis-client/redis-client';
import { _getMessage } from './_get-message';
import { QueueDeleteOperationError } from '../errors';
import { MessageDeleteError } from './errors/message-delete.error';

export function _deleteMessage(
redisClient: RedisClient,
Expand Down Expand Up @@ -94,11 +94,7 @@ export function _deleteMessage(
(err, reply) => {
if (err) cb(err);
else if (reply !== 'OK')
cb(
new QueueDeleteOperationError(
reply ? String(reply) : undefined,
),
);
cb(new MessageDeleteError(reply ? String(reply) : undefined));
else cb();
},
);
Expand Down
8 changes: 4 additions & 4 deletions src/lib/message/_from-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import {
IMessageSerialized,
IMessageStateSerialized,
} from '../../../types';
import { Message } from './message';
import { MessageEnvelope } from './message-envelope';
import { _fromJSON } from '../exchange/_from-json';

export function _fromMessage(
msg: string | Message,
msg: string | MessageEnvelope,
status: EMessagePropertyStatus | null,
msgState: string | MessageState | null,
): Message {
): MessageEnvelope {
const { exchange, ...params }: IMessageSerialized =
typeof msg === 'string' ? JSON.parse(msg) : msg.toJSON();

// Properties
const m = new Message();
const m = new MessageEnvelope();
Object.assign(m, params);

// Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@
*/

import { async, RedisClient, ICallback } from 'redis-smq-common';
import { Message } from '../../message/message';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { QueueMessageNotFoundError } from '../errors';
import { EMessageProperty } from '../../../../types';
import { _fromMessage } from '../../message/_from-message';
import { MessageEnvelope } from './message-envelope';
import { redisKeys } from '../../common/redis-keys/redis-keys';
import { EMessageProperty } from '../../../types';
import { _fromMessage } from './_from-message';
import { MessageNotFoundError } from './errors/message-not-found.error';

export function _getMessage(
redisClient: RedisClient,
messageId: string,
cb: ICallback<Message>,
cb: ICallback<MessageEnvelope>,
): void {
const { keyMessage } = redisKeys.getMessageKeys(messageId);
redisClient.hgetall(keyMessage, (err, reply) => {
if (err) cb(err);
else if (!reply || !Object.keys(reply).length)
cb(new QueueMessageNotFoundError());
cb(new MessageNotFoundError());
else
cb(
null,
Expand All @@ -39,17 +39,17 @@ export function _getMessage(
export function _getMessages(
redisClient: RedisClient,
messageIds: string[],
cb: ICallback<Message[]>,
cb: ICallback<MessageEnvelope[]>,
): void {
const messages: Message[] = [];
const messages: MessageEnvelope[] = [];
async.each(
messageIds,
(id, index, done) => {
const { keyMessage } = redisKeys.getMessageKeys(id);
redisClient.hgetall(keyMessage, (err, reply) => {
if (err) done(err);
else if (!reply || !Object.keys(reply).length) {
done(new QueueMessageNotFoundError());
done(new MessageNotFoundError());
} else {
const msg = _fromMessage(
reply[EMessageProperty.MESSAGE],
Expand Down
16 changes: 16 additions & 0 deletions src/lib/message/errors/message-delete.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { MessageError } from './message.error';

export class MessageDeleteError extends MessageError {
constructor(msg?: string) {
super(msg ?? `MESSAGE_DELETE_ERROR`);
}
}
16 changes: 16 additions & 0 deletions src/lib/message/errors/message-not-found.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { MessageError } from './message.error';

export class MessageNotFoundError extends MessageError {
constructor(msg?: string) {
super(msg ?? `MESSAGE_NOT_FOUND`);
}
}
Loading

0 comments on commit f71f0d0

Please sign in to comment.