Skip to content

Commit

Permalink
feat!: add message status, return message IDs for produced messages
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Nov 26, 2023
1 parent 4ac300e commit 56566bf
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 240 deletions.
4 changes: 3 additions & 1 deletion src/common/events/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import { events as baseEvents } from 'redis-smq-common';

export const events = {
...baseEvents,
MESSAGE_PUBLISHED: 'message_produced',
MESSAGE_PUBLISHED: 'message_published',
MESSAGE_NEXT: 'message_next',
MESSAGE_RECEIVED: 'message_received',
MESSAGE_ACKNOWLEDGED: 'message_acknowledged',
MESSAGE_UNACKNOWLEDGED: 'message_unacknowledged',
MESSAGE_DEAD_LETTERED: 'message_dead_lettered',
MESSAGE_DELAYED: 'message_delayed',
MESSAGE_REQUEUED: 'message_requeued',
};
61 changes: 54 additions & 7 deletions src/lib/consumer/message-handler/consume-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@

import { Message } from '../../message/message';
import {
EConsumeMessageUnacknowledgedCause,
EMessageProperty,
EMessagePropertyStatus,
EConsumeMessageUnacknowledgedCause,
} from '../../../../types';
import { events } from '../../../common/events/events';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { MessageHandler } from './message-handler';
import { RedisClient, ICallback, ILogger } from 'redis-smq-common';
import {
CallbackEmptyReplyError,
ICallback,
ILogger,
RedisClient,
} from 'redis-smq-common';
import { processingQueue } from './processing-queue';
import { ERetryAction } from './retry-message';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
Expand Down Expand Up @@ -77,13 +82,45 @@ export class ConsumeMessage {
cause,
(err, reply) => {
if (err) this.messageHandler.handleError(err);
else if (!reply)
this.messageHandler.handleError(new CallbackEmptyReplyError());
else {
this.messageHandler.emit(events.MESSAGE_UNACKNOWLEDGED, msg, cause);
if (reply && reply.action === ERetryAction.DEAD_LETTER) {
const messageId = msg.getRequiredId();
const queue = msg.getDestinationQueue();
const messageHandlerId = this.messageHandler.getId();
const consumerId = this.messageHandler.getConsumerId();
this.messageHandler.emit(
events.MESSAGE_UNACKNOWLEDGED,
cause,
messageId,
queue,
messageHandlerId,
consumerId,
);
if (reply.action === ERetryAction.DEAD_LETTER) {
this.messageHandler.emit(
events.MESSAGE_DEAD_LETTERED,
msg,
reply.deadLetterCause,
messageId,
queue,
messageHandlerId,
consumerId,
);
} else if (reply.action === ERetryAction.DELAY) {
this.messageHandler.emit(
events.MESSAGE_DELAYED,
messageId,
queue,
messageHandlerId,
consumerId,
);
} else {
this.messageHandler.emit(
events.MESSAGE_REQUEUED,
messageId,
queue,
messageHandlerId,
consumerId,
);
}
}
Expand Down Expand Up @@ -118,15 +155,25 @@ export class ConsumeMessage {
} else {
this.acknowledgeMessage(msg, (err) => {
if (err) this.messageHandler.handleError(err);
else this.messageHandler.emit(events.MESSAGE_ACKNOWLEDGED, msg);
else
this.messageHandler.emit(
events.MESSAGE_ACKNOWLEDGED,
msg.getRequiredId(),
msg.getDestinationQueue(),
this.messageHandler.getId(),
this.messageHandler.getConsumerId(),
);
});
}
}
};

// As a safety measure, in case if we mess with message system
// properties, only a clone of the message is actually given
this.messageHandler.getHandler()(_fromMessage(msg), onConsumed);
this.messageHandler.getHandler()(
_fromMessage(msg, msg.getStatus(), msg.getMessageState()),
onConsumed,
);
} catch (error: unknown) {
this.logger.error(error);
this.unacknowledgeMessage(
Expand Down
164 changes: 80 additions & 84 deletions src/lib/consumer/message-handler/dequeue-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,25 @@

import * as os from 'os';
import {
EMessageProperty,
EMessagePropertyStatus,
EQueueType,
TQueueConsumer,
IQueueParams,
IQueueRateLimit,
TQueueConsumer,
} from '../../../../types';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import {
async,
CallbackEmptyReplyError,
ICallback,
RedisClient,
Ticker,
ICallback,
CallbackEmptyReplyError,
CallbackInvalidReplyError,
} from 'redis-smq-common';
import { events } from '../../../common/events/events';
import { MessageHandler } from './message-handler';
import { QueueRateLimit } from '../../queue/queue-rate-limit';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
import { QueueNotFoundError } from '../../queue/errors';
import { _getQueueProperties } from '../../queue/queue/_get-queue-properties';
import { _fromMessage } from '../../message/_from-message';

const IPAddresses = (() => {
const nets = os.networkInterfaces();
Expand All @@ -55,104 +51,104 @@ export class DequeueMessage {
protected queueRateLimit: IQueueRateLimit | null = null;
protected ticker: Ticker;
protected messageHandler: MessageHandler;
protected blockUntilMessageReceived: boolean;
protected queueType: EQueueType | null = null;

constructor(messageHandler: MessageHandler, redisClient: RedisClient) {
constructor(
messageHandler: MessageHandler,
redisClient: RedisClient,
blockUntilMessageReceived = true,
) {
this.messageHandler = messageHandler;
this.redisClient = redisClient;
this.blockUntilMessageReceived = blockUntilMessageReceived;
this.queue = messageHandler.getQueue();
this.consumerId = messageHandler.getConsumerId();
this.redisKeys = redisKeys.getQueueConsumerKeys(
this.queue,
this.consumerId,
);
this.ticker = new Ticker(() => this.dequeue());
this.ticker = new Ticker(() => {
this.messageHandler.emit(events.MESSAGE_NEXT);
});
}

protected emitReceivedMessage(messageId: string): void {
const { keyMessage } = redisKeys.getMessageKeys(messageId);
const keys: string[] = [keyMessage];
const argv: (string | number)[] = [
EMessageProperty.STATUS,
EMessageProperty.STATE,
EMessageProperty.MESSAGE,
EMessagePropertyStatus.PROCESSING,
];
this.redisClient.runScript(
ELuaScriptName.FETCH_MESSAGE_FOR_PROCESSING,
keys,
argv,
(err, reply: unknown) => {
if (err) this.messageHandler.handleError(err);
else if (!reply)
this.messageHandler.handleError(new CallbackEmptyReplyError());
else if (!Array.isArray(reply))
this.messageHandler.handleError(new CallbackInvalidReplyError());
else {
const [state, msg]: string[] = reply;
const message = _fromMessage(msg, state);
this.messageHandler.emit(events.MESSAGE_RECEIVED, message);
}
},
);
}
protected handleMessage: ICallback<string | null> = (err, messageId) => {
if (err) {
this.ticker.abort();
this.messageHandler.handleError(err);
} else if (typeof messageId === 'string') {
this.messageHandler.emit(
events.MESSAGE_RECEIVED,
messageId,
this.queue,
this.consumerId,
);
} else {
this.ticker.nextTick();
}
};

protected dequeueMessageWithPriority(cb: ICallback<string | null>): void {
this.redisClient.zpoprpush(
this.redisKeys.keyPriorityQueuePending,
this.redisKeys.keyQueueProcessing,
cb,
);
}
protected dequeueWithRateLimit = (): boolean => {
if (this.queueRateLimit) {
QueueRateLimit.hasExceeded(
this.redisClient,
this.queue,
this.queueRateLimit,
(err, isExceeded) => {
if (err) this.messageHandler.handleError(err);
else if (isExceeded) this.ticker.nextTick();
else this.dequeueWithRateLimitExec();
},
);
return true;
}
return false;
};

protected waitForMessage(cb: ICallback<string | null>): void {
this.redisClient.brpoplpush(
this.redisKeys.keyQueuePending,
this.redisKeys.keyQueueProcessing,
0,
cb,
);
}
protected dequeueWithRateLimitExec = () => {
if (this.isPriorityQueuingEnabled()) this.dequeueWithPriority();
else this.dequeueAndReturn();
};

protected dequeueWithPriority = (): boolean => {
if (this.isPriorityQueuingEnabled()) {
this.redisClient.zpoprpush(
this.redisKeys.keyPriorityQueuePending,
this.redisKeys.keyQueueProcessing,
this.handleMessage,
);
return true;
}
return false;
};

protected dequeueMessage(cb: ICallback<string | null>): void {
protected dequeueAndBlock = (): boolean => {
if (this.blockUntilMessageReceived) {
this.redisClient.brpoplpush(
this.redisKeys.keyQueuePending,
this.redisKeys.keyQueueProcessing,
0,
this.handleMessage,
);
return true;
}
return false;
};

protected dequeueAndReturn = (): void => {
this.redisClient.rpoplpush(
this.redisKeys.keyQueuePending,
this.redisKeys.keyQueueProcessing,
cb,
this.handleMessage,
);
}
};

dequeue(): void {
const cb: ICallback<string | null> = (err, reply) => {
if (err) {
this.ticker.abort();
this.messageHandler.handleError(err);
} else if (typeof reply === 'string') {
this.emitReceivedMessage(reply);
} else {
this.ticker.nextTick();
}
};
const deq = () => {
if (this.isPriorityQueuingEnabled()) this.dequeueMessageWithPriority(cb);
else this.dequeueMessage(cb);
};
if (this.isPriorityQueuingEnabled() || this.queueRateLimit) {
if (this.queueRateLimit) {
QueueRateLimit.hasExceeded(
this.redisClient,
this.queue,
this.queueRateLimit,
(err, isExceeded) => {
if (err) this.messageHandler.handleError(err);
else if (isExceeded) this.ticker.nextTick();
else deq();
},
);
} else deq();
} else {
this.waitForMessage(cb);
}
this.dequeueWithRateLimit() ||
this.dequeueWithPriority() ||
this.dequeueAndBlock() ||
this.dequeueAndReturn();
}

protected isPriorityQueuingEnabled(): boolean {
Expand Down
5 changes: 3 additions & 2 deletions src/lib/consumer/message-handler/message-handler-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ export class MessageHandlerRunner {
handlerParams,
)}) has been registered.`,
);
if (this.consumer.isRunning()) this.runMessageHandler(handlerParams, cb);
else cb();
if (this.consumer.isRunning()) {
this.runMessageHandler(handlerParams, cb);
} else cb();
}
}

Expand Down
Loading

0 comments on commit 56566bf

Please sign in to comment.