Skip to content

Commit

Permalink
test(QueueMessages): move message methods to Message,add MessageEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Dec 10, 2023
1 parent f71f0d0 commit 932a88e
Show file tree
Hide file tree
Showing 69 changed files with 262 additions and 238 deletions.
4 changes: 2 additions & 2 deletions tests/common/consumer-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import { Consumer } from '../../src/lib/consumer/consumer';
import { defaultQueue } from './message-producing-consuming';
import { Producer } from '../../src/lib/producer/producer';
import { Message } from '../../src/lib/message/message';
import { MessageEnvelope } from '../../src/lib/message/message-envelope';

const producer = new Producer();
producer.run((err) => {
if (err) throw err;
producer.produce(
new Message().setQueue(defaultQueue).setBody(123).setRetryDelay(0),
new MessageEnvelope().setQueue(defaultQueue).setBody(123).setRetryDelay(0),
(err) => {
if (err) throw err;
},
Expand Down
14 changes: 7 additions & 7 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { EQueueType, IQueueParams } from '../../types';
import { Message } from '../../src/lib/message/message';
import { MessageEnvelope } from '../../src/lib/message/message-envelope';
import { untilConsumerEvent, untilMessageAcknowledged } from './events';
import { getConsumer } from './consumer';
import { getProducer } from './producer';
Expand All @@ -35,7 +35,7 @@ export async function produceAndAcknowledgeMessage(
}),
});

const message = new Message();
const message = new MessageEnvelope();
message.setBody({ hello: 'world' }).setQueue(queue);
const { messages } = await producer.produceAsync(message);

Expand All @@ -57,7 +57,7 @@ export async function produceAndDeadLetterMessage(
}),
});

const message = new Message();
const message = new MessageEnvelope();
message.setBody({ hello: 'world' }).setQueue(queue);
const { messages } = await producer.produceAsync(message);

Expand All @@ -70,7 +70,7 @@ export async function produceMessage(queue: IQueueParams = defaultQueue) {
const producer = getProducer();
await producer.runAsync();

const message = new Message();
const message = new MessageEnvelope();
message.setBody({ hello: 'world' }).setQueue(queue);
const { messages } = await producer.produceAsync(message);
return { producer, messageId: messages[0], queue };
Expand All @@ -82,8 +82,8 @@ export async function produceMessageWithPriority(
const producer = getProducer();
await producer.runAsync();

const message = new Message();
message.setPriority(Message.MessagePriority.LOW).setQueue(queue);
const message = new MessageEnvelope();
message.setPriority(MessageEnvelope.MessagePriority.LOW).setQueue(queue);
const { messages } = await producer.produceAsync(message);
return { messageId: messages[0], producer, queue };
}
Expand All @@ -92,7 +92,7 @@ export async function scheduleMessage(queue: IQueueParams = defaultQueue) {
const producer = getProducer();
await producer.runAsync();

const message = new Message();
const message = new MessageEnvelope();
message.setScheduledDelay(10000).setQueue(queue);
const { messages } = await producer.produceAsync(message);
return { messageId: messages[0], producer, queue };
Expand Down
4 changes: 2 additions & 2 deletions tests/common/start-up.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import { getRedisInstance } from './redis';
import { logger } from 'redis-smq-common';
import { Configuration } from '../../src/config/configuration';
import { config } from './config';
import { Message } from '../../src/lib/message/message';
import { MessageEnvelope } from '../../src/lib/message/message-envelope';

export async function startUp(): Promise<void> {
Configuration.reset();
Configuration.getSetConfig(config);
Message.setDefaultConsumeOptions({
MessageEnvelope.setDefaultConsumeOptions({
ttl: 0,
retryThreshold: 3,
retryDelay: 0,
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../index';
import { MessageEnvelope } from '../../../index';
import { MessageState } from '../../../src/lib/message/message-state';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand All @@ -26,7 +26,7 @@ test('Produce and consume 1 message', async () => {
messageHandler: (msg1, cb) => cb(),
});

const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

expect(msg.getMessageState()).toBe(null);
Expand Down
8 changes: 4 additions & 4 deletions tests/tests/consuming-messages/test00003.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../index';
import { MessageEnvelope } from '../../../index';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -24,15 +24,15 @@ test('Produce and consume 100 message: LIFO Queues', async () => {
await producer.runAsync();

const total = 100;
const publishedMsg: Message[] = [];
const publishedMsg: MessageEnvelope[] = [];
for (let i = 0; i < total; i += 1) {
const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);
await producer.produceAsync(msg);
publishedMsg.push(msg);
}

const deliveredMessages: Message[] = [];
const deliveredMessages: MessageEnvelope[] = [];
const consumer = getConsumer({
messageHandler: (msg, cb) => {
deliveredMessages.push(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00004.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { delay } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -32,7 +32,7 @@ test('A message is dead-lettered and not delivered when messageTTL is exceeded',
unacknowledged += 1;
});

const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setTTL(3000).setQueue(defaultQueue);

await producer.produceAsync(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00005.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { delay } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -31,7 +31,7 @@ test('Setting default message TTL from configuration', async () => {
consumer.on('messageUnacknowledged', () => {
unacks += 1;
});
const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue).setTTL(2000);

await producer.produceAsync(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00006.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ICallback } from 'redis-smq-common';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand All @@ -33,7 +33,7 @@ test('A message is unacknowledged when messageConsumeTimeout is exceeded', async
}),
});

const msg = new Message();
const msg = new MessageEnvelope();
msg
.setBody({ hello: 'world' })
.setQueue(defaultQueue)
Expand Down
6 changes: 3 additions & 3 deletions tests/tests/consuming-messages/test00007.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ICallback } from 'redis-smq-common';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand All @@ -25,7 +25,7 @@ test('Unacknowledged message are re-queued when messageRetryThreshold is not exc

let callCount = 0;
const consumer = getConsumer({
messageHandler: jest.fn((msg: Message, cb: ICallback<void>) => {
messageHandler: jest.fn((msg: MessageEnvelope, cb: ICallback<void>) => {
callCount += 1;
if (callCount === 1) throw new Error('Explicit error');
else if (callCount === 2) cb();
Expand All @@ -43,7 +43,7 @@ test('Unacknowledged message are re-queued when messageRetryThreshold is not exc
acknowledged += 1;
});

const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

await producer.produceAsync(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00008.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -45,7 +45,7 @@ test('Async exceptions are caught when consuming a message', async () => {
acknowledged += 1;
});

const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

await producer.produceAsync(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00009.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -33,7 +33,7 @@ test('A message is dead-lettered when messageRetryThreshold is exceeded', async
unacknowledged += 1;
});

const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

await producer.produceAsync(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00011.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { delay } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
import {
Expand Down Expand Up @@ -58,7 +58,7 @@ test('Given many consumers, a message is delivered only to one consumer', async
/**
*
*/
const msg = new Message();
const msg = new MessageEnvelope();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

const producer = getProducer();
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00012.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -44,7 +44,7 @@ test('An unacknowledged message is delayed given messageRetryDelay > 0 and messa
acks += 1;
});

const msg = new Message();
const msg = new MessageEnvelope();
msg
.setBody({ hello: 'world' })
.setQueue(defaultQueue)
Expand Down
10 changes: 5 additions & 5 deletions tests/tests/consuming-messages/test00013.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { delay } from 'bluebird';
import { ICallback } from 'redis-smq-common';
import { getConsumer } from '../../common/consumer';
Expand All @@ -19,7 +19,7 @@ import {
} from '../../common/message-producing-consuming';

type TQueueMetrics = {
receivedMessages: Message[];
receivedMessages: MessageEnvelope[];
acks: number;
};

Expand All @@ -38,7 +38,7 @@ test('Given many queues, a message is recovered from a consumer crash and re-que

const queueAConsumer = getConsumer({
queue: defaultQueue,
messageHandler: (msg: Message, cb: ICallback<void>) => {
messageHandler: (msg: MessageEnvelope, cb: ICallback<void>) => {
defaultQueueMetrics.receivedMessages.push(msg);
cb();
},
Expand All @@ -50,7 +50,7 @@ test('Given many queues, a message is recovered from a consumer crash and re-que

const queueBConsumer = getConsumer({
queue: 'queue_b',
messageHandler: (msg: Message, cb: ICallback<void>) => {
messageHandler: (msg: MessageEnvelope, cb: ICallback<void>) => {
queueBMetrics.receivedMessages.push(msg);
cb();
},
Expand All @@ -64,7 +64,7 @@ test('Given many queues, a message is recovered from a consumer crash and re-que
await producer.runAsync();

// Produce a message to QUEUE B
const anotherMsg = new Message();
const anotherMsg = new MessageEnvelope();
anotherMsg.setBody({ id: 'b' }).setQueue('queue_b');
await producer.produceAsync(anotherMsg);

Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00014.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -22,7 +22,7 @@ test('Consume message from different queues and published by a single producer i
const queue = `QuEue_${i}`;
await createQueue(queue, false);

const message = new Message();
const message = new MessageEnvelope();
// queue name should be normalized to lowercase
message.setBody(`Message ${i}`).setQueue(queue);
await producer.produceAsync(message);
Expand Down
10 changes: 7 additions & 3 deletions tests/tests/consuming-messages/test00016.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { promisifyAll } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { Consumer } from '../../../src/lib/consumer/consumer';
import { untilMessageAcknowledged } from '../../common/events';
import { getProducer } from '../../common/producer';
Expand All @@ -31,11 +31,15 @@ test('Consume message from different queues using a single consumer instance: ca
const producer = getProducer();
await producer.runAsync();

const msg1 = new Message().setQueue('test_queue').setBody('some data');
const msg1 = new MessageEnvelope()
.setQueue('test_queue')
.setBody('some data');
const { messages } = await producer.produceAsync(msg1);
await untilMessageAcknowledged(consumer, messages[0]);

const msg2 = new Message().setQueue('another_queue').setBody('some data');
const msg2 = new MessageEnvelope()
.setQueue('another_queue')
.setBody('some data');
const { messages: m } = await producer.produceAsync(msg2);
await untilMessageAcknowledged(consumer, m[0]);

Expand Down
2 changes: 1 addition & 1 deletion tests/tests/consuming-messages/test00017.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { getQueueDeadLetteredMessages } from '../../common/queue-dead-lettered-m
import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-messages';
import { Configuration } from '../../../src/config/configuration';

test('Message storage: storeMessages = false', async () => {
test('MessageEnvelope storage: storeMessages = false', async () => {
const cfg = merge(config, {
messages: {
store: false,
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00018.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import {
defaultQueue,
} from '../../common/message-producing-consuming';
import { ProducerMessageAlreadyPublishedError } from '../../../src/lib/producer/errors';
import { Message } from '../../../src/lib/message/message';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { getProducer } from '../../common/producer';

test('Producing duplicate message', async () => {
await createQueue(defaultQueue, false);
const m = new Message().setQueue(defaultQueue).setBody('123');
const m = new MessageEnvelope().setQueue(defaultQueue).setBody('123');
const p = await getProducer();
await p.runAsync();
await p.produceAsync(m);
Expand Down

0 comments on commit 932a88e

Please sign in to comment.