diff --git a/index.ts b/index.ts index 6b1578f4..bc11d6fd 100755 --- a/index.ts +++ b/index.ts @@ -19,7 +19,7 @@ export * from './src/lib/message/errors'; export { Consumer } from './src/lib/consumer/consumer'; export { Producer } from './src/lib/producer/producer'; export { Message } from './src/lib/message/message'; -export { MessageEnvelope } from './src/lib/message/message-envelope'; +export { ProducibleMessage } from './src/lib/message/producible-message'; export { ExchangeDirect } from './src/lib/exchange/exchange-direct'; export { ExchangeTopic } from './src/lib/exchange/exchange-topic'; export { ExchangeFanOut } from './src/lib/exchange/exchange-fan-out'; diff --git a/src/lib/consumer/message-handler/consume-message.ts b/src/lib/consumer/message-handler/consume-message.ts index f0f325fa..51a787d3 100644 --- a/src/lib/consumer/message-handler/consume-message.ts +++ b/src/lib/consumer/message-handler/consume-message.ts @@ -24,8 +24,8 @@ import { import { processingQueue } from './processing-queue'; import { ERetryAction } from './retry-message'; import { ELuaScriptName } from '../../../common/redis-client/redis-client'; -import { _fromMessage } from '../../message/_from-message'; import { Configuration } from '../../../config/configuration'; +import { _createRMessage } from '../../message/_create-r-message'; export class ConsumeMessage { protected keyQueueProcessing: string; @@ -52,7 +52,7 @@ export class ConsumeMessage { message: MessageEnvelope, cb: ICallback, ): void { - const messageId = message.getRequiredId(); + const messageId = message.getId(); const queue = message.getDestinationQueue(); const { keyQueueAcknowledged } = redisKeys.getQueueKeys(queue); const { store, queueSize, expire } = @@ -87,7 +87,7 @@ export class ConsumeMessage { else if (!reply) this.messageHandler.handleError(new CallbackEmptyReplyError()); else { - const messageId = msg.getRequiredId(); + const messageId = msg.getId(); const queue = msg.getDestinationQueue(); const messageHandlerId = this.messageHandler.getId(); const consumerId = this.messageHandler.getConsumerId(); @@ -134,7 +134,7 @@ export class ConsumeMessage { let isTimeout = false; let timer: NodeJS.Timeout | null = null; try { - const consumeTimeout = msg.getConsumeTimeout(); + const consumeTimeout = msg.producibleMessage.getConsumeTimeout(); if (consumeTimeout) { timer = setTimeout(() => { isTimeout = true; @@ -160,7 +160,7 @@ export class ConsumeMessage { else this.messageHandler.emit( 'messageAcknowledged', - msg.getRequiredId(), + msg.getId(), msg.getDestinationQueue(), this.messageHandler.getId(), this.messageHandler.getConsumerId(), @@ -169,13 +169,7 @@ export class ConsumeMessage { } } }; - - // As a safety measure, in case if we mess with message system - // properties, only a clone of the message is actually given - this.messageHandler.getHandler()( - _fromMessage(msg, msg.getStatus(), msg.getMessageState()), - onConsumed, - ); + this.messageHandler.getHandler()(_createRMessage(msg), onConsumed); } catch (error: unknown) { this.logger.error(error); this.unacknowledgeMessage( diff --git a/src/lib/consumer/message-handler/processing-queue.ts b/src/lib/consumer/message-handler/processing-queue.ts index 0d4160d9..32d49daf 100644 --- a/src/lib/consumer/message-handler/processing-queue.ts +++ b/src/lib/consumer/message-handler/processing-queue.ts @@ -115,7 +115,7 @@ export const processingQueue = { if (err) done(err); else { if (message) { - const messageId = message.getRequiredId(); + const messageId = message.getId(); args.push(messageId); const { keyMessage } = redisKeys.getMessageKeys(messageId); diff --git a/src/lib/consumer/message-handler/retry-message.ts b/src/lib/consumer/message-handler/retry-message.ts index c19f2a00..bd319927 100644 --- a/src/lib/consumer/message-handler/retry-message.ts +++ b/src/lib/consumer/message-handler/retry-message.ts @@ -55,7 +55,7 @@ export function getRetryAction( deadLetterCause: EConsumeMessageDeadLetterCause.RETRY_THRESHOLD_EXCEEDED, }; } - const delay = message.getRetryDelay(); + const delay = message.producibleMessage.getRetryDelay(); if (delay) { return { action: ERetryAction.DELAY, diff --git a/src/lib/message/_create-r-message.ts b/src/lib/message/_create-r-message.ts new file mode 100644 index 00000000..0ae7aede --- /dev/null +++ b/src/lib/message/_create-r-message.ts @@ -0,0 +1,115 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { MessageEnvelope } from './message-envelope'; +import { + EMessagePriority, + EMessagePropertyStatus, + IMessageSerialized, + IConsumableMessage, + IQueueParams, + TExchange, + TTopicParams, +} from '../../../types'; + +export function _createRMessage(msg: MessageEnvelope): IConsumableMessage { + return { + getPublishedAt(): number | null { + return msg.getPublishedAt(); + }, + + getScheduledAt(): number | null { + return msg.getScheduledAt(); + }, + + getScheduledMessageId(): string | null { + return msg.getScheduledMessageId(); + }, + + getId(): string { + return msg.getId(); + }, + + getStatus(): EMessagePropertyStatus { + return msg.getStatus(); + }, + + hasPriority(): boolean { + return msg.producibleMessage.hasPriority(); + }, + + getQueue(): IQueueParams | string | null { + return msg.producibleMessage.getQueue(); + }, + + getDestinationQueue(): IQueueParams { + return msg.getDestinationQueue(); + }, + + getPriority(): EMessagePriority | null { + return msg.producibleMessage.getPriority(); + }, + + getBody(): unknown { + return msg.producibleMessage.getBody(); + }, + + getTTL(): number { + return msg.producibleMessage.getTTL(); + }, + + getRetryThreshold(): number { + return msg.producibleMessage.getRetryThreshold(); + }, + + getRetryDelay(): number { + return msg.producibleMessage.getRetryDelay(); + }, + + getConsumeTimeout(): number { + return msg.producibleMessage.getConsumeTimeout(); + }, + + getCreatedAt(): number { + return msg.producibleMessage.getCreatedAt(); + }, + + getScheduledRepeat(): number { + return msg.producibleMessage.getScheduledRepeat(); + }, + + getScheduledRepeatPeriod(): number | null { + return msg.producibleMessage.getScheduledRepeatPeriod(); + }, + + getScheduledCRON(): string | null { + return msg.producibleMessage.getScheduledCRON(); + }, + + getScheduledDelay(): number | null { + return msg.producibleMessage.getScheduledDelay(); + }, + + getFanOut(): string | null { + return msg.producibleMessage.getFanOut(); + }, + + getTopic(): TTopicParams | string | null { + return msg.producibleMessage.getTopic(); + }, + + toJSON(): IMessageSerialized { + return msg.toJSON(); + }, + + getExchange(): TExchange { + return msg.getExchange(); + }, + }; +} diff --git a/src/lib/message/_delete-message.ts b/src/lib/message/_delete-message.ts index 11dc5bde..57879b40 100644 --- a/src/lib/message/_delete-message.ts +++ b/src/lib/message/_delete-message.ts @@ -22,7 +22,7 @@ import { } from '../../../types'; import { ELuaScriptName } from '../../common/redis-client/redis-client'; import { _getMessage } from './_get-message'; -import { MessageDeleteError } from './errors/message-delete.error'; +import { MessageDeleteError } from './errors'; export function _deleteMessage( redisClient: RedisClient, diff --git a/src/lib/message/_from-message.ts b/src/lib/message/_from-message.ts index 38679fce..2e0fc6d3 100644 --- a/src/lib/message/_from-message.ts +++ b/src/lib/message/_from-message.ts @@ -15,18 +15,23 @@ import { } from '../../../types'; import { MessageEnvelope } from './message-envelope'; import { _fromJSON } from '../exchange/_from-json'; +import { ProducibleMessage } from './producible-message'; export function _fromMessage( msg: string | MessageEnvelope, status: EMessagePropertyStatus | null, msgState: string | MessageState | null, ): MessageEnvelope { - const { exchange, ...params }: IMessageSerialized = + const { exchange, destinationQueue, ...params }: IMessageSerialized = typeof msg === 'string' ? JSON.parse(msg) : msg.toJSON(); - // Properties - const m = new MessageEnvelope(); - Object.assign(m, params); + const messagePub = new ProducibleMessage(); + Object.assign(messagePub, params); + messagePub.setExchange(_fromJSON(exchange)); + + // + const m = new MessageEnvelope(messagePub); + m.setDestinationQueue(destinationQueue); // Status if (status !== null) { @@ -42,8 +47,5 @@ export function _fromMessage( m.setMessageState(messageStateInstance); } - // Exchange - if (exchange) m.setExchange(_fromJSON(exchange)); - return m; } diff --git a/src/lib/message/_get-message-state.ts b/src/lib/message/_get-message-state.ts new file mode 100644 index 00000000..2384bfdb --- /dev/null +++ b/src/lib/message/_get-message-state.ts @@ -0,0 +1,26 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { ICallback, RedisClient } from 'redis-smq-common'; +import { redisKeys } from '../../common/redis-keys/redis-keys'; +import { EMessageProperty, IMessageStateSerialized } from '../../../types'; +import { MessageNotFoundError } from './errors'; + +export function _getMessageState( + redisClient: RedisClient, + messageId: string, + cb: ICallback, +): void { + const { keyMessage } = redisKeys.getMessageKeys(messageId); + redisClient.hget(keyMessage, String(EMessageProperty.STATE), (err, reply) => { + if (err) cb(err); + else if (!reply) cb(new MessageNotFoundError()); + else cb(null, JSON.parse(reply)); + }); +} diff --git a/src/lib/message/message-envelope.ts b/src/lib/message/message-envelope.ts index 3778d477..9192227f 100644 --- a/src/lib/message/message-envelope.ts +++ b/src/lib/message/message-envelope.ts @@ -9,83 +9,36 @@ import { parseExpression } from 'cron-parser'; import { - EMessagePriority, EMessagePropertyStatus, IMessageSerialized, IQueueParams, TExchange, - TMessageConsumeOptions, - TTopicParams, } from '../../../types'; import { MessageState } from './message-state'; import { MessageDestinationQueueAlreadySetError, MessageDestinationQueueRequiredError, - MessageError, MessageExchangeRequiredError, } from './errors'; -import { ExchangeDirect } from '../exchange/exchange-direct'; -import { ExchangeFanOut } from '../exchange/exchange-fan-out'; -import { ExchangeTopic } from '../exchange/exchange-topic'; +import { ProducibleMessage } from './producible-message'; export class MessageEnvelope { - protected static defaultConsumeOptions: TMessageConsumeOptions = { - ttl: 0, - retryThreshold: 3, - retryDelay: 60000, - consumeTimeout: 0, - }; + protected messageState: MessageState; - protected readonly createdAt: number; - - protected ttl = 0; - - protected retryThreshold = 3; - - protected retryDelay = 60000; - - protected consumeTimeout = 0; - - protected body: unknown = null; - - protected priority: EMessagePriority | null = null; - - protected scheduledCron: string | null = null; - - protected scheduledDelay: number | null = null; - - protected scheduledRepeatPeriod: number | null = null; - - protected scheduledRepeat = 0; - - protected messageState: MessageState | null = null; - - protected exchange: TExchange | null = null; + protected status: EMessagePropertyStatus = EMessagePropertyStatus.UNPUBLISHED; protected destinationQueue: IQueueParams | null = null; - protected status: EMessagePropertyStatus = EMessagePropertyStatus.UNPUBLISHED; - - constructor() { - this.createdAt = Date.now(); - const { consumeTimeout, retryDelay, ttl, retryThreshold } = - MessageEnvelope.defaultConsumeOptions; - this.setConsumeTimeout(consumeTimeout); - this.setRetryDelay(retryDelay); - this.setTTL(ttl); - this.setRetryThreshold(retryThreshold); - } + readonly producibleMessage; - getMessageState(): MessageState | null { - return this.messageState; + constructor(producibleMessage: ProducibleMessage) { + this.producibleMessage = producibleMessage; + this.messageState = new MessageState(); + const scheduledDelay = this.producibleMessage.getScheduledDelay(); + if (scheduledDelay) this.messageState.setNextScheduledDelay(scheduledDelay); } - getRequiredMessageState(): MessageState { - if (!this.messageState) { - throw new MessageError( - `Expected an instance of MessageState. Probably the message has not yet been published`, - ); - } + getMessageState(): MessageState { return this.messageState; } @@ -94,18 +47,6 @@ export class MessageEnvelope { return this; } - getSetMessageState(): MessageState { - if (!this.messageState) { - const m = new MessageState(); - if (this.scheduledDelay) m.setNextScheduledDelay(this.scheduledDelay); - this.setMessageState(m); - return m; - } - return this.messageState; - } - - /// - getPublishedAt(): number | null { if (this.messageState) { return this.messageState.getPublishedAt(); @@ -127,24 +68,14 @@ export class MessageEnvelope { return null; } - getId(): string | null { - if (this.messageState) { - return this.messageState.getId(); - } - return null; - } - - getRequiredId(): string { - if (!this.messageState) { - throw new MessageError(`Message has not yet been published`); - } + getId(): string { return this.messageState.getId(); } getSetExpired(): boolean { - return this.getRequiredMessageState().getSetExpired( - this.getTTL(), - this.getCreatedAt(), + return this.getMessageState().getSetExpired( + this.producibleMessage.getTTL(), + this.producibleMessage.getCreatedAt(), ); } @@ -152,125 +83,6 @@ export class MessageEnvelope { return this.status; } - /// - - setExchange(exchange: TExchange): MessageEnvelope { - this.exchange = exchange; - return this; - } - - /** - * @param period In millis - */ - setScheduledRepeatPeriod(period: number): MessageEnvelope { - // JavaScript users do not have type checking - // So just make sure that we have an integer value - const value = Number(period); - if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds', - ); - } - this.scheduledRepeatPeriod = value; - return this; - } - - /** - * @param delay In millis - */ - setScheduledDelay(delay: number): MessageEnvelope { - // JavaScript users do not have type checking - // So just make sure that we have an integer value - const value = Number(delay); - if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds', - ); - } - this.scheduledDelay = value; - return this; - } - - setScheduledCRON(cron: string): MessageEnvelope { - // it throws an exception for an invalid value - parseExpression(cron); - this.scheduledCron = cron; - return this; - } - - setScheduledRepeat(repeat: number): MessageEnvelope { - // JavaScript users do not have type checking - // So just make sure that we have an integer value - const value = Number(repeat); - if (isNaN(value) || value < 0) { - throw new MessageError('Expected a positive integer value >= 0'); - } - this.scheduledRepeat = value; - return this; - } - - resetScheduledParams(): MessageEnvelope { - this.scheduledCron = null; - this.scheduledDelay = null; - this.scheduledRepeatPeriod = null; - this.scheduledRepeat = 0; - return this; - } - - /** - * @param ttl In milliseconds - */ - setTTL(ttl: number): MessageEnvelope { - this.ttl = MessageEnvelope.validateTTL(ttl); - return this; - } - - /** - * @param timeout In milliseconds - */ - setConsumeTimeout(timeout: number): MessageEnvelope { - this.consumeTimeout = MessageEnvelope.validateConsumeTimeout(timeout); - return this; - } - - setRetryThreshold(threshold: number): MessageEnvelope { - this.retryThreshold = MessageEnvelope.validateRetryThreshold(threshold); - return this; - } - - /** - * @param delay In millis - */ - setRetryDelay(delay: number): MessageEnvelope { - this.retryDelay = MessageEnvelope.validateRetryDelay(delay); - return this; - } - - setBody(body: unknown): MessageEnvelope { - this.body = body; - return this; - } - - setPriority(priority: EMessagePriority): MessageEnvelope { - this.priority = priority; - return this; - } - - setFanOut(bindingKey: string): MessageEnvelope { - this.exchange = new ExchangeFanOut(bindingKey); - return this; - } - - setTopic(topicParams: string | TTopicParams): MessageEnvelope { - this.exchange = new ExchangeTopic(topicParams); - return this; - } - - setQueue(queueParams: string | IQueueParams): MessageEnvelope { - this.exchange = new ExchangeDirect(queueParams); - return this; - } - setDestinationQueue(queue: IQueueParams): MessageEnvelope { if (this.destinationQueue !== null) { throw new MessageDestinationQueueAlreadySetError(); @@ -279,27 +91,11 @@ export class MessageEnvelope { return this; } - disablePriority(): MessageEnvelope { - this.priority = null; - return this; - } - - hasPriority(): boolean { - return this.priority !== null; - } - setStatus(s: EMessagePropertyStatus): MessageEnvelope { this.status = s; return this; } - getQueue(): IQueueParams | string | null { - if (this.exchange instanceof ExchangeDirect) { - return this.exchange.getBindingParams(); - } - return null; - } - getDestinationQueue(): IQueueParams { if (!this.destinationQueue) { throw new MessageDestinationQueueRequiredError(); @@ -307,60 +103,16 @@ export class MessageEnvelope { return this.destinationQueue; } - getPriority(): EMessagePriority | null { - return this.priority; - } - - getBody(): unknown { - return this.body; - } - - getTTL(): number { - return this.ttl; - } - - getRetryThreshold(): number { - return this.retryThreshold; - } - - getRetryDelay(): number { - return this.retryDelay; - } - - getConsumeTimeout(): number { - return this.consumeTimeout; - } - - getCreatedAt(): number { - return this.createdAt; - } - - getScheduledRepeat(): number { - return this.scheduledRepeat; - } - - getScheduledRepeatPeriod(): number | null { - return this.scheduledRepeatPeriod; - } - - getScheduledCRON(): string | null { - return this.scheduledCron; - } - - getMessageScheduledDelay(): number | null { - return this.scheduledDelay; - } - hasNextDelay(): boolean { if (this.messageState) { return this.messageState.hasDelay(); } - return !!this.getMessageScheduledDelay(); + return !!this.producibleMessage.getScheduledDelay(); } getNextScheduledTimestamp(): number { if (this.isSchedulable()) { - const messageState = this.getRequiredMessageState(); + const messageState = this.getMessageState(); // Delay const delay = messageState.getSetNextDelay(); @@ -369,18 +121,19 @@ export class MessageEnvelope { } // CRON - const msgScheduledCron = this.getScheduledCRON(); + const msgScheduledCron = this.producibleMessage.getScheduledCRON(); const cronTimestamp = msgScheduledCron ? parseExpression(msgScheduledCron).next().getTime() : 0; // Repeat - const msgScheduledRepeat = this.getScheduledRepeat(); + const msgScheduledRepeat = this.producibleMessage.getScheduledRepeat(); let repeatTimestamp = 0; if (msgScheduledRepeat) { const newCount = messageState.getMessageScheduledRepeatCount() + 1; if (newCount <= msgScheduledRepeat) { - const scheduledRepeatPeriod = this.getScheduledRepeatPeriod(); + const scheduledRepeatPeriod = + this.producibleMessage.getScheduledRepeatPeriod(); const now = Date.now(); if (scheduledRepeatPeriod) { repeatTimestamp = now + scheduledRepeatPeriod; @@ -419,15 +172,12 @@ export class MessageEnvelope { return 0; } - getExchange(): TExchange | null { - return this.exchange; - } - - getRequiredExchange(): TExchange { - if (!this.exchange) { + getExchange(): TExchange { + const exchange = this.producibleMessage.getExchange(); + if (!exchange) { throw new MessageExchangeRequiredError(); } - return this.exchange; + return exchange; } toString(): string { @@ -436,19 +186,19 @@ export class MessageEnvelope { toJSON(): IMessageSerialized { return { - createdAt: this.createdAt, - ttl: this.ttl, - retryThreshold: this.retryThreshold, - retryDelay: this.retryDelay, - consumeTimeout: this.consumeTimeout, - body: this.body, - priority: this.priority, - scheduledCron: this.scheduledCron, - scheduledDelay: this.scheduledDelay, - scheduledRepeatPeriod: this.scheduledRepeatPeriod, - scheduledRepeat: this.scheduledRepeat, - exchange: this.exchange ? this.exchange.toJSON() : null, - destinationQueue: this.destinationQueue, + createdAt: this.producibleMessage.getCreatedAt(), + ttl: this.producibleMessage.getTTL(), + retryThreshold: this.producibleMessage.getRetryThreshold(), + retryDelay: this.producibleMessage.getRetryDelay(), + consumeTimeout: this.producibleMessage.getConsumeTimeout(), + body: this.producibleMessage.getBody(), + priority: this.producibleMessage.getPriority(), + scheduledCron: this.producibleMessage.getScheduledCRON(), + scheduledDelay: this.producibleMessage.getScheduledDelay(), + scheduledRepeatPeriod: this.producibleMessage.getScheduledRepeatPeriod(), + scheduledRepeat: this.producibleMessage.getScheduledRepeat(), + exchange: this.getExchange().toJSON(), + destinationQueue: this.getDestinationQueue(), }; } @@ -457,7 +207,7 @@ export class MessageEnvelope { if (!messageState) { return false; } - const threshold = this.getRetryThreshold(); + const threshold = this.producibleMessage.getRetryThreshold(); return messageState.getAttempts() + 1 >= threshold; } @@ -466,72 +216,9 @@ export class MessageEnvelope { } isPeriodic(): boolean { - return this.getScheduledCRON() !== null || this.getScheduledRepeat() > 0; - } - - protected static validateRetryDelay(delay: number): number { - const value = Number(delay); - if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer in milliseconds >= 0', - ); - } - return value; - } - protected static validateTTL(ttl: unknown): number { - const value = Number(ttl); - if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds >= 0', - ); - } - return value; - } - - protected static validateConsumeTimeout(timeout: unknown): number { - const value = Number(timeout); - if (isNaN(value) || value < 0) { - throw new MessageError( - 'Expected a positive integer value in milliseconds >= 0', - ); - } - return value; - } - - protected static validateRetryThreshold(threshold: unknown): number { - const value = Number(threshold); - if (isNaN(value) || value < 0) { - throw new MessageError( - 'Retry threshold should be a positive integer >= 0', - ); - } - return value; - } - - static setDefaultConsumeOptions( - consumeOptions: Partial, - ): void { - const { - ttl = null, - retryThreshold = null, - retryDelay = null, - consumeTimeout = null, - } = consumeOptions; - - if (ttl !== null) - MessageEnvelope.defaultConsumeOptions.ttl = - MessageEnvelope.validateTTL(ttl); - - if (retryDelay !== null) - MessageEnvelope.defaultConsumeOptions.retryDelay = - MessageEnvelope.validateRetryDelay(retryDelay); - - if (retryThreshold !== null) - MessageEnvelope.defaultConsumeOptions.retryThreshold = - MessageEnvelope.validateRetryThreshold(retryThreshold); - - if (consumeTimeout !== null) - MessageEnvelope.defaultConsumeOptions.consumeTimeout = - MessageEnvelope.validateConsumeTimeout(consumeTimeout); + return ( + this.producibleMessage.getScheduledCRON() !== null || + this.producibleMessage.getScheduledRepeat() > 0 + ); } } diff --git a/src/lib/message/message.ts b/src/lib/message/message.ts index 365bd6f1..a4da3247 100644 --- a/src/lib/message/message.ts +++ b/src/lib/message/message.ts @@ -11,9 +11,14 @@ import { CallbackEmptyReplyError, ICallback } from 'redis-smq-common'; import { _getCommonRedisClient } from '../../common/_get-common-redis-client'; import { _getMessage, _getMessages } from './_get-message'; import { _deleteMessage } from './_delete-message'; -import { MessageEnvelope } from './message-envelope'; -import { EMessagePropertyStatus } from '../../../types'; +import { + EMessagePropertyStatus, + IMessageStateSerialized, + IConsumableMessage, +} from '../../../types'; import { _getMessageStatus } from './_get-message-status'; +import { _createRMessage } from './_create-r-message'; +import { _getMessageState } from './_get-message-state'; export class Message { getMessageStatus( @@ -27,22 +32,48 @@ export class Message { }); } + getMessageState( + messageId: string, + cb: ICallback, + ): void { + _getCommonRedisClient((err, client) => { + if (err) cb(err); + else if (!client) cb(new CallbackEmptyReplyError()); + else _getMessageState(client, messageId, cb); + }); + } + getMessagesByIds( messageIds: string[], - cb: ICallback, + cb: ICallback, ): void { _getCommonRedisClient((err, client) => { if (err) cb(err); else if (!client) cb(new CallbackEmptyReplyError()); - else _getMessages(client, messageIds, cb); + else + _getMessages(client, messageIds, (err, reply) => { + if (err) cb(err); + else if (!reply) cb(new CallbackEmptyReplyError()); + else { + cb( + null, + reply.map((i) => _createRMessage(i)), + ); + } + }); }); } - getMessageById(messageId: string, cb: ICallback): void { + getMessageById(messageId: string, cb: ICallback): void { _getCommonRedisClient((err, client) => { if (err) cb(err); else if (!client) cb(new CallbackEmptyReplyError()); - else _getMessage(client, messageId, cb); + else + _getMessage(client, messageId, (err, reply) => { + if (err) cb(err); + else if (!reply) cb(new CallbackEmptyReplyError()); + else cb(null, _createRMessage(reply)); + }); }); } diff --git a/src/lib/message/producible-message.ts b/src/lib/message/producible-message.ts new file mode 100644 index 00000000..d0e4dc27 --- /dev/null +++ b/src/lib/message/producible-message.ts @@ -0,0 +1,325 @@ +/* + * Copyright (c) + * Weyoss + * https://github.com/weyoss + * + * This source code is licensed under the MIT license found in the LICENSE file + * in the root directory of this source tree. + */ + +import { + EMessagePriority, + IQueueParams, + TExchange, + TMessageConsumeOptions, + TTopicParams, +} from '../../../types'; +import { MessageError } from './errors'; +import { parseExpression } from 'cron-parser'; +import { ExchangeFanOut } from '../exchange/exchange-fan-out'; +import { ExchangeTopic } from '../exchange/exchange-topic'; +import { ExchangeDirect } from '../exchange/exchange-direct'; + +export class ProducibleMessage { + protected static defaultConsumeOptions: TMessageConsumeOptions = { + ttl: 0, + retryThreshold: 3, + retryDelay: 60000, + consumeTimeout: 0, + }; + + protected readonly createdAt: number; + + protected ttl = 0; + + protected retryThreshold = 3; + + protected retryDelay = 60000; + + protected consumeTimeout = 0; + + protected body: unknown = null; + + protected priority: EMessagePriority | null = null; + + protected scheduledCron: string | null = null; + + protected scheduledDelay: number | null = null; + + protected scheduledRepeatPeriod: number | null = null; + + protected scheduledRepeat = 0; + + protected exchange: TExchange | null = null; + + constructor() { + this.createdAt = Date.now(); + const { consumeTimeout, retryDelay, ttl, retryThreshold } = + ProducibleMessage.defaultConsumeOptions; + this.setConsumeTimeout(consumeTimeout); + this.setRetryDelay(retryDelay); + this.setTTL(ttl); + this.setRetryThreshold(retryThreshold); + } + + getCreatedAt(): number { + return this.createdAt; + } + + /** + * @param period In millis + */ + setScheduledRepeatPeriod(period: number): ProducibleMessage { + // JavaScript users do not have type checking + // So just make sure that we have an integer value + const value = Number(period); + if (isNaN(value) || value < 0) { + throw new MessageError( + 'Expected a positive integer value in milliseconds', + ); + } + this.scheduledRepeatPeriod = value; + return this; + } + + /** + * @param delay In millis + */ + setScheduledDelay(delay: number): ProducibleMessage { + // JavaScript users do not have type checking + // So just make sure that we have an integer value + const value = Number(delay); + if (isNaN(value) || value < 0) { + throw new MessageError( + 'Expected a positive integer value in milliseconds', + ); + } + this.scheduledDelay = value; + return this; + } + + getScheduledDelay(): number | null { + return this.scheduledDelay; + } + + setScheduledCRON(cron: string): ProducibleMessage { + // it throws an exception for an invalid value + parseExpression(cron); + this.scheduledCron = cron; + return this; + } + + setScheduledRepeat(repeat: number): ProducibleMessage { + // JavaScript users do not have type checking + // So just make sure that we have an integer value + const value = Number(repeat); + if (isNaN(value) || value < 0) { + throw new MessageError('Expected a positive integer value >= 0'); + } + this.scheduledRepeat = value; + return this; + } + + resetScheduledParams(): ProducibleMessage { + this.scheduledCron = null; + this.scheduledDelay = null; + this.scheduledRepeatPeriod = null; + this.scheduledRepeat = 0; + return this; + } + + /** + * @param ttl In milliseconds + */ + setTTL(ttl: number): ProducibleMessage { + this.ttl = ProducibleMessage.validateTTL(ttl); + return this; + } + + /** + * @param timeout In milliseconds + */ + setConsumeTimeout(timeout: number): ProducibleMessage { + this.consumeTimeout = ProducibleMessage.validateConsumeTimeout(timeout); + return this; + } + + setRetryThreshold(threshold: number): ProducibleMessage { + this.retryThreshold = ProducibleMessage.validateRetryThreshold(threshold); + return this; + } + + /** + * @param delay In millis + */ + setRetryDelay(delay: number): ProducibleMessage { + this.retryDelay = ProducibleMessage.validateRetryDelay(delay); + return this; + } + + setBody(body: unknown): ProducibleMessage { + this.body = body; + return this; + } + + setPriority(priority: EMessagePriority): ProducibleMessage { + this.priority = priority; + return this; + } + + hasPriority(): boolean { + return this.priority !== null; + } + + disablePriority(): ProducibleMessage { + this.priority = null; + return this; + } + + setFanOut(fanOutName: string): ProducibleMessage { + this.exchange = new ExchangeFanOut(fanOutName); + return this; + } + + setTopic(topicParams: string | TTopicParams): ProducibleMessage { + this.exchange = new ExchangeTopic(topicParams); + return this; + } + + setQueue(queueParams: string | IQueueParams): ProducibleMessage { + this.exchange = new ExchangeDirect(queueParams); + return this; + } + + getQueue(): IQueueParams | string | null { + if (this.exchange && this.exchange instanceof ExchangeDirect) { + return this.exchange.getBindingParams(); + } + return null; + } + + getTopic(): TTopicParams | string | null { + if (this.exchange && this.exchange instanceof ExchangeTopic) { + return this.exchange.getBindingParams(); + } + return null; + } + + getFanOut(): string | null { + if (this.exchange && this.exchange instanceof ExchangeFanOut) { + return this.exchange.getBindingParams(); + } + return null; + } + + setExchange(exchange: TExchange): ProducibleMessage { + this.exchange = exchange; + return this; + } + + getExchange(): TExchange | null { + return this.exchange; + } + + getScheduledRepeatPeriod(): number | null { + return this.scheduledRepeatPeriod; + } + + getScheduledCRON(): string | null { + return this.scheduledCron; + } + + getScheduledRepeat(): number { + return this.scheduledRepeat; + } + + getTTL(): number { + return this.ttl; + } + + getRetryThreshold(): number { + return this.retryThreshold; + } + + getRetryDelay(): number { + return this.retryDelay; + } + + getConsumeTimeout(): number { + return this.consumeTimeout; + } + + getPriority(): EMessagePriority | null { + return this.priority; + } + + getBody(): unknown { + return this.body; + } + + protected static validateRetryDelay(delay: number): number { + const value = Number(delay); + if (isNaN(value) || value < 0) { + throw new MessageError( + 'Expected a positive integer in milliseconds >= 0', + ); + } + return value; + } + protected static validateTTL(ttl: unknown): number { + const value = Number(ttl); + if (isNaN(value) || value < 0) { + throw new MessageError( + 'Expected a positive integer value in milliseconds >= 0', + ); + } + return value; + } + + protected static validateConsumeTimeout(timeout: unknown): number { + const value = Number(timeout); + if (isNaN(value) || value < 0) { + throw new MessageError( + 'Expected a positive integer value in milliseconds >= 0', + ); + } + return value; + } + + protected static validateRetryThreshold(threshold: unknown): number { + const value = Number(threshold); + if (isNaN(value) || value < 0) { + throw new MessageError( + 'Retry threshold should be a positive integer >= 0', + ); + } + return value; + } + + static setDefaultConsumeOptions( + consumeOptions: Partial, + ): void { + const { + ttl = null, + retryThreshold = null, + retryDelay = null, + consumeTimeout = null, + } = consumeOptions; + + if (ttl !== null) + ProducibleMessage.defaultConsumeOptions.ttl = + ProducibleMessage.validateTTL(ttl); + + if (retryDelay !== null) + ProducibleMessage.defaultConsumeOptions.retryDelay = + ProducibleMessage.validateRetryDelay(retryDelay); + + if (retryThreshold !== null) + ProducibleMessage.defaultConsumeOptions.retryThreshold = + ProducibleMessage.validateRetryThreshold(retryThreshold); + + if (consumeTimeout !== null) + ProducibleMessage.defaultConsumeOptions.consumeTimeout = + ProducibleMessage.validateConsumeTimeout(consumeTimeout); + } +} diff --git a/src/lib/producer/_schedule-message.ts b/src/lib/producer/_schedule-message.ts index d9a25a7b..41942bac 100644 --- a/src/lib/producer/_schedule-message.ts +++ b/src/lib/producer/_schedule-message.ts @@ -34,8 +34,8 @@ export function _scheduleMessage( keyQueueMessages, } = redisKeys.getQueueKeys(queue); const ts = Date.now(); - message.getRequiredMessageState().setScheduledAt(ts).setLastScheduledAt(ts); - const messageId = message.getRequiredId(); + message.getMessageState().setScheduledAt(ts).setLastScheduledAt(ts); + const messageId = message.getId(); const { keyMessage } = redisKeys.getMessageKeys(messageId); mixed.runScript( ELuaScriptName.SCHEDULE_MESSAGE, diff --git a/src/lib/producer/errors/index.ts b/src/lib/producer/errors/index.ts index 8609e08b..d9828ed1 100644 --- a/src/lib/producer/errors/index.ts +++ b/src/lib/producer/errors/index.ts @@ -7,7 +7,6 @@ * in the root directory of this source tree. */ -export { ProducerMessageAlreadyPublishedError } from './producer-message-already-published.error'; export { ProducerMessageNotPublishedError } from './producer-message-not-published.error'; export { ProducerMessageNotScheduledError } from './producer-message-not-scheduled.error'; export { ProducerInstanceNotRunningError } from './producer-instance-not-running.error'; diff --git a/src/lib/producer/errors/producer-message-already-published.error.ts b/src/lib/producer/errors/producer-message-already-published.error.ts deleted file mode 100644 index 53b1b58d..00000000 --- a/src/lib/producer/errors/producer-message-already-published.error.ts +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) - * Weyoss - * https://github.com/weyoss - * - * This source code is licensed under the MIT license found in the LICENSE file - * in the root directory of this source tree. - */ - -import { ProducerError } from './producer.error'; - -export class ProducerMessageAlreadyPublishedError extends ProducerError { - constructor( - msg = 'The message can not published. Either you have already published the message or you have called the getSetMessageState() method.', - ) { - super(msg); - } -} diff --git a/src/lib/producer/producer.ts b/src/lib/producer/producer.ts index 151a8c6b..4ab2903b 100644 --- a/src/lib/producer/producer.ts +++ b/src/lib/producer/producer.ts @@ -18,7 +18,6 @@ import { } from 'redis-smq-common'; import { redisKeys } from '../../common/redis-keys/redis-keys'; import { ProducerMessageNotPublishedError } from './errors'; -import { ProducerMessageAlreadyPublishedError } from './errors'; import { ELuaScriptName } from '../../common/redis-client/redis-client'; import { _scheduleMessage } from './_schedule-message'; import { ProducerInstanceNotRunningError } from './errors'; @@ -31,8 +30,8 @@ import { } from '../../../types'; import { ExchangeDirect } from '../exchange/exchange-direct'; import { _getQueueParams } from '../queue/queue/_get-queue-params'; -import { _fromMessage } from '../message/_from-message'; import { Configuration } from '../../config/configuration'; +import { ProducibleMessage } from '../message/producible-message'; export class Producer extends Base { protected initProducerEventListeners = (cb: ICallback): void => { @@ -52,9 +51,9 @@ export class Producer extends Base { message: MessageEnvelope, cb: ICallback, ): void { - const messageState = message.getRequiredMessageState(); + const messageState = message.getMessageState(); messageState.setPublishedAt(Date.now()); - const messageId = message.getRequiredId(); + const messageId = message.getId(); const { keyQueueProperties, keyPriorityQueuePending, @@ -76,7 +75,7 @@ export class Producer extends Base { EQueueType.PRIORITY_QUEUE, EQueueType.LIFO_QUEUE, EQueueType.FIFO_QUEUE, - message.getPriority() ?? '', + message.producibleMessage.getPriority() ?? '', keyQueueMessages, // Passing as argument. Final key will be computed dynamically messageId, EMessageProperty.STATUS, @@ -107,7 +106,7 @@ export class Producer extends Base { ): void { const messageId = message .setDestinationQueue(queue) - .getSetMessageState() + .getMessageState() .getId(); if (message.isSchedulable()) _scheduleMessage(redisClient, message, (err) => { @@ -128,69 +127,52 @@ export class Producer extends Base { }); } - produce( - message: MessageEnvelope, - cb: ICallback<{ - messages: string[]; - scheduled: boolean; - }>, - ): void { + produce(msg: ProducibleMessage, cb: ICallback): void { if (!this.powerSwitch.isUp()) cb(new ProducerInstanceNotRunningError()); else { - if (message.getMessageState()) - cb(new ProducerMessageAlreadyPublishedError()); - else { - const callback: ICallback = (err, messages) => { + const callback: ICallback = (err, messages) => { + if (err) cb(err); + else if (!messages) cb(new CallbackEmptyReplyError()); + else { + cb(null, typeof messages === 'string' ? [messages] : messages); + } + }; + const redisClient = this.getSharedRedisClient(); + const envelope = new MessageEnvelope(msg); + const exchange = envelope.getExchange(); + if (exchange instanceof ExchangeDirect) { + const queue = _getQueueParams(exchange.getBindingParams()); + this.produceMessage(redisClient, envelope, queue, callback); + } else { + exchange.getQueues((err, queues) => { if (err) cb(err); - else if (!messages) cb(new CallbackEmptyReplyError()); + else if (!queues?.length) + cb( + new ProducerMessageNotPublishedError( + `The exchange (${exchange.constructor.name}) does not match any queue.`, + ), + ); else { - cb(null, { - scheduled: false, - messages: typeof messages === 'string' ? [messages] : messages, - }); + const messages: string[] = []; + async.eachOf( + queues, + (queue, index, done) => { + const m = new MessageEnvelope(msg); + this.produceMessage(redisClient, m, queue, (err, messageId) => { + if (err) done(err); + else { + messageId && messages.push(messageId); + done(); + } + }); + }, + (err) => { + if (err) callback(err); + else callback(null, messages); + }, + ); } - }; - const redisClient = this.getSharedRedisClient(); - const exchange = message.getRequiredExchange(); - if (exchange instanceof ExchangeDirect) { - const queue = _getQueueParams(exchange.getBindingParams()); - this.produceMessage(redisClient, message, queue, callback); - } else { - exchange.getQueues((err, queues) => { - if (err) cb(err); - else if (!queues?.length) - cb( - new ProducerMessageNotPublishedError( - `The exchange (${exchange.constructor.name}) does not match any queue.`, - ), - ); - else { - const messages: string[] = []; - async.eachOf( - queues, - (queue, index, done) => { - const msg = _fromMessage(message, null, null); - this.produceMessage( - redisClient, - msg, - queue, - (err, messageId) => { - if (err) done(err); - else { - messageId && messages.push(messageId); - done(); - } - }, - ); - }, - (err) => { - if (err) callback(err); - else callback(null, messages); - }, - ); - } - }); - } + }); } } } diff --git a/src/lib/queue/queue-messages-paginator/queue-messages-paginator-abstract.ts b/src/lib/queue/queue-messages-paginator/queue-messages-paginator-abstract.ts index ab5fcf51..66f3812e 100644 --- a/src/lib/queue/queue-messages-paginator/queue-messages-paginator-abstract.ts +++ b/src/lib/queue/queue-messages-paginator/queue-messages-paginator-abstract.ts @@ -12,12 +12,12 @@ import { IQueueMessagesPageParams, IQueueMessagesPage, IQueueParams, + IConsumableMessage, } from '../../../../types'; import { async, CallbackEmptyReplyError, ICallback } from 'redis-smq-common'; -import { MessageEnvelope } from '../../message/message-envelope'; import { _getCommonRedisClient } from '../../../common/_get-common-redis-client'; import { _deleteMessage } from '../../message/_delete-message'; -import { _getMessages } from '../../message/_get-message'; +import { Message } from '../../message/message'; export abstract class QueueMessagesPaginatorAbstract implements IQueueMessages { protected getTotalPages(pageSize: number, totalItems: number): number { @@ -93,7 +93,7 @@ export abstract class QueueMessagesPaginatorAbstract implements IQueueMessages { queue: string | IQueueParams, cursor: number, pageSize: number, - cb: ICallback>, + cb: ICallback>, ): void { this.getMessagesIds(queue, cursor, pageSize, (err, reply) => { if (err) cb(err); @@ -104,9 +104,12 @@ export abstract class QueueMessagesPaginatorAbstract implements IQueueMessages { if (err) cb(err); else if (!client) cb(new CallbackEmptyReplyError()); else { - _getMessages(client, reply.items, (err, messages) => { + const message = new Message(); + message.getMessagesByIds(reply.items, (err, items) => { if (err) cb(err); - else cb(null, { ...reply, items: messages ?? [] }); + else { + cb(null, { ...reply, items: items ?? [] }); + } }); } }); diff --git a/src/lib/queue/queue-messages-paginator/queue-messages-paginator-list.ts b/src/lib/queue/queue-messages-paginator/queue-messages-paginator-list.ts index a30fe32a..17074473 100644 --- a/src/lib/queue/queue-messages-paginator/queue-messages-paginator-list.ts +++ b/src/lib/queue/queue-messages-paginator/queue-messages-paginator-list.ts @@ -93,13 +93,13 @@ export abstract class QueueMessagesPaginatorList extends QueueMessagesPaginatorA else if (!message) cb(new CallbackEmptyReplyError()); else { const queue = message.getDestinationQueue(); - message.getRequiredMessageState().reset(); // resetting all system parameters + message.getMessageState().reset(); // resetting all system parameters const { keyQueueProperties, keyQueuePending, keyPriorityQueuePending, } = redisKeys.getQueueKeys(queue); - const messageId = message.getRequiredId(); + const messageId = message.getId(); const { keyMessage } = redisKeys.getMessageKeys(messageId); const sourceKey = this.getRedisKey(source); client.runScript( @@ -120,8 +120,8 @@ export abstract class QueueMessagesPaginatorList extends QueueMessagesPaginatorA EMessagePropertyStatus.PENDING, EMessageProperty.STATE, messageId, - message.getPriority() ?? '', - JSON.stringify(message.getRequiredMessageState()), + message.producibleMessage.getPriority() ?? '', + JSON.stringify(message.getMessageState()), ], (err, reply) => { if (err) cb(err); diff --git a/src/lib/queue/queue-pending-messages.ts b/src/lib/queue/queue-pending-messages.ts index e66973e1..ff6f799b 100644 --- a/src/lib/queue/queue-pending-messages.ts +++ b/src/lib/queue/queue-pending-messages.ts @@ -9,6 +9,7 @@ import { EQueueType, + IConsumableMessage, IQueueMessages, IQueueMessagesPage, IQueueParams, @@ -16,7 +17,6 @@ import { import { PriorityQueueMessages } from './priority-queue-messages'; import { LinearQueueMessages } from './linear-queue-messages'; import { CallbackEmptyReplyError, ICallback } from 'redis-smq-common'; -import { MessageEnvelope } from '../message/message-envelope'; import { _getQueueProperties } from './queue/_get-queue-properties'; import { _getQueueParams } from './queue/_get-queue-params'; import { _getCommonRedisClient } from '../../common/_get-common-redis-client'; @@ -63,7 +63,7 @@ export class QueuePendingMessages implements IQueueMessages { queue: string | IQueueParams, cursor: number, pageSize: number, - cb: ICallback>, + cb: ICallback>, ): void { this.getQueueImplementation(queue, (err, pendingMessages) => { if (err) cb(err); diff --git a/src/workers/delay-unacknowledged.worker.ts b/src/workers/delay-unacknowledged.worker.ts index a5a2be85..8c38b959 100644 --- a/src/workers/delay-unacknowledged.worker.ts +++ b/src/workers/delay-unacknowledged.worker.ts @@ -59,7 +59,7 @@ export class DelayUnacknowledgedWorker extends Worker { if (err) done(err); else if (!message) cb(new CallbackEmptyReplyError()); else { - const messageId = message.getRequiredId(); + const messageId = message.getId(); const queue = message.getDestinationQueue(); const { keyQueueProperties, @@ -74,8 +74,8 @@ export class DelayUnacknowledgedWorker extends Worker { keyQueueScheduled, ); args.push(messageId, ''); - const delay = message.getRetryDelay(); - const messageState = message.getRequiredMessageState(); + const delay = message.producibleMessage.getRetryDelay(); + const messageState = message.getMessageState(); messageState.incrAttempts(); messageState.setNextRetryDelay(delay); const timestamp = message.getNextScheduledTimestamp(); diff --git a/src/workers/publish-scheduled.worker.ts b/src/workers/publish-scheduled.worker.ts index fd272e91..d2a7b478 100644 --- a/src/workers/publish-scheduled.worker.ts +++ b/src/workers/publish-scheduled.worker.ts @@ -72,14 +72,14 @@ export class PublishScheduledWorker extends Worker { messages, (msg, _, done) => { const ts = Date.now(); - const messagePriority = msg.getPriority() ?? ''; + const messagePriority = msg.producibleMessage.getPriority() ?? ''; const queue = msg.getDestinationQueue(); const { keyMessage: keyScheduledMessage } = redisKeys.getMessageKeys( - msg.getRequiredId(), + msg.getId(), ); const nextScheduleTimestamp = msg.getNextScheduledTimestamp(); const scheduledMessageState = msg - .getRequiredMessageState() + .getMessageState() .setLastScheduledAt(ts); const { keyQueueProperties, @@ -95,16 +95,16 @@ export class PublishScheduledWorker extends Worker { let newKeyMessage: string = ''; const hasBeenUnacknowledged = - msg.getRetryDelay() > 0 && - msg.getRequiredMessageState().getAttempts() > 0; + msg.producibleMessage.getRetryDelay() > 0 && + msg.getMessageState().getAttempts() > 0; if (!hasBeenUnacknowledged) { newMessage = _fromMessage(msg, null, null); + newMessage.producibleMessage.resetScheduledParams(); newMessageState = newMessage - .resetScheduledParams() - .getSetMessageState() + .getMessageState() .setPublishedAt(ts) - .setScheduledMessageId(msg.getRequiredId()); + .setScheduledMessageId(msg.getId()); newMessageId = newMessageState.getId(); newKeyMessage = redisKeys.getMessageKeys(newMessageId).keyMessage; } @@ -123,7 +123,7 @@ export class PublishScheduledWorker extends Worker { newMessage ? JSON.stringify(newMessage) : '', newMessageState ? JSON.stringify(newMessageState) : '', messagePriority, - msg.getRequiredId(), + msg.getId(), nextScheduleTimestamp, JSON.stringify(scheduledMessageState), ); diff --git a/src/workers/requeue-unacknowledged.worker.ts b/src/workers/requeue-unacknowledged.worker.ts index 892c9d30..7f4f783f 100644 --- a/src/workers/requeue-unacknowledged.worker.ts +++ b/src/workers/requeue-unacknowledged.worker.ts @@ -58,9 +58,9 @@ export class RequeueUnacknowledgedWorker extends Worker { if (err) done(err); else if (!message) cb(new CallbackEmptyReplyError()); else { - const messageId = message.getRequiredId(); + const messageId = message.getId(); const queue = message.getDestinationQueue(); - const messageState = message.getRequiredMessageState(); + const messageState = message.getMessageState(); const { keyQueuePending, keyPriorityQueuePending, @@ -74,7 +74,8 @@ export class RequeueUnacknowledgedWorker extends Worker { keyMessage, ); messageState.incrAttempts(); - const messagePriority = message.getPriority() ?? ''; + const messagePriority = + message.producibleMessage.getPriority() ?? ''; argv.push( messageId, messagePriority, diff --git a/types/consumer/message-handler.ts b/types/consumer/message-handler.ts index 2f62dbb5..d579850b 100644 --- a/types/consumer/message-handler.ts +++ b/types/consumer/message-handler.ts @@ -7,13 +7,13 @@ * in the root directory of this source tree. */ -import { MessageEnvelope } from '../../src/lib/message/message-envelope'; import { ICallback } from 'redis-smq-common'; import { IQueueParams } from '../queue'; import { redisKeys } from '../../src/common/redis-keys/redis-keys'; +import { IConsumableMessage } from '../message'; export type TConsumerMessageHandler = ( - msg: MessageEnvelope, + msg: IConsumableMessage, cb: ICallback, ) => void; diff --git a/types/message/message.ts b/types/message/message.ts index 47e2c452..9c40cc3f 100644 --- a/types/message/message.ts +++ b/types/message/message.ts @@ -9,6 +9,8 @@ import { TExchangeSerialized } from '../index'; import { IQueueParams } from '../queue'; +import { MessageEnvelope } from '../../src/lib/message/message-envelope'; +import { ProducibleMessage } from '../../src/lib/message/producible-message'; export enum EMessagePriority { HIGHEST, @@ -41,7 +43,7 @@ export enum EMessagePropertyStatus { export interface IMessageSerialized { createdAt: number; - exchange: TExchangeSerialized | null; + exchange: TExchangeSerialized; ttl: number; retryThreshold: number; retryDelay: number; @@ -52,7 +54,7 @@ export interface IMessageSerialized { scheduledDelay: number | null; scheduledRepeatPeriod: number | null; scheduledRepeat: number; - destinationQueue: IQueueParams | null; + destinationQueue: IQueueParams; } export type TMessageConsumeOptions = { @@ -61,3 +63,36 @@ export type TMessageConsumeOptions = { retryDelay: number; consumeTimeout: number; }; + +export interface IConsumableMessage + extends Omit< + ProducibleMessage, + | 'setBody' + | 'setConsumeTimeout' + | 'setPriority' + | 'setTTL' + | 'setFanOut' + | 'setQueue' + | 'setTopic' + | 'setRetryDelay' + | 'setRetryThreshold' + | 'setScheduledRepeatPeriod' + | 'setScheduledRepeat' + | 'setScheduledCRON' + | 'setScheduledDelay' + | 'disablePriority' + | 'resetScheduledParams' + | 'setExchange' + | 'getExchange' + >, + Pick< + MessageEnvelope, + | 'getScheduledMessageId' + | 'getDestinationQueue' + | 'getStatus' + | 'getPublishedAt' + | 'getScheduledAt' + | 'getId' + | 'toJSON' + | 'getExchange' + > {} diff --git a/types/queue-messages/index.ts b/types/queue-messages/index.ts index 94a0bc76..95971c3a 100644 --- a/types/queue-messages/index.ts +++ b/types/queue-messages/index.ts @@ -8,8 +8,8 @@ */ import { ICallback } from 'redis-smq-common'; -import { MessageEnvelope } from '../../src/lib/message/message-envelope'; import { IQueueParams } from '../queue'; +import { IConsumableMessage } from '../message'; export interface IQueueMessages { countMessages(queue: string | IQueueParams, cb: ICallback): void; @@ -17,7 +17,7 @@ export interface IQueueMessages { queue: string | IQueueParams, page: number, pageSize: number, - cb: ICallback>, + cb: ICallback>, ): void; purge(queue: string | IQueueParams, cb: ICallback): void; }