Skip to content

Commit

Permalink
test: add ProducibleMessage/ConsumableMessage/MessageEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Dec 17, 2023
1 parent 4a3eec7 commit 2961a22
Show file tree
Hide file tree
Showing 66 changed files with 304 additions and 390 deletions.
7 changes: 5 additions & 2 deletions tests/common/consumer-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import { Consumer } from '../../src/lib/consumer/consumer';
import { defaultQueue } from './message-producing-consuming';
import { Producer } from '../../src/lib/producer/producer';
import { MessageEnvelope } from '../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../src/lib/message/producible-message';

const producer = new Producer();
producer.run((err) => {
if (err) throw err;
producer.produce(
new MessageEnvelope().setQueue(defaultQueue).setBody(123).setRetryDelay(0),
new ProducibleMessage()
.setQueue(defaultQueue)
.setBody(123)
.setRetryDelay(0),
(err) => {
if (err) throw err;
},
Expand Down
32 changes: 16 additions & 16 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { EMessagePriority, EQueueType, IQueueParams } from '../../types';
import { MessageEnvelope } from '../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../src/lib/message/producible-message';
import { untilConsumerEvent, untilMessageAcknowledged } from './events';
import { getConsumer } from './consumer';
import { getProducer } from './producer';
Expand All @@ -35,13 +35,13 @@ export async function produceAndAcknowledgeMessage(
}),
});

const message = new MessageEnvelope();
const message = new ProducibleMessage();
message.setBody({ hello: 'world' }).setQueue(queue);
const { messages } = await producer.produceAsync(message);
const [messageId] = await producer.produceAsync(message);

consumer.run();
await untilMessageAcknowledged(consumer);
return { producer, consumer, queue, messageId: messages[0] };
return { producer, consumer, queue, messageId };
}

export async function produceAndDeadLetterMessage(
Expand All @@ -57,23 +57,23 @@ export async function produceAndDeadLetterMessage(
}),
});

const message = new MessageEnvelope();
const message = new ProducibleMessage();
message.setBody({ hello: 'world' }).setQueue(queue);
const { messages } = await producer.produceAsync(message);
const [messageId] = await producer.produceAsync(message);

consumer.run();
await untilConsumerEvent(consumer, 'messageDeadLettered');
return { producer, consumer, messageId: messages[0], queue };
return { producer, consumer, messageId, queue };
}

export async function produceMessage(queue: IQueueParams = defaultQueue) {
const producer = getProducer();
await producer.runAsync();

const message = new MessageEnvelope();
const message = new ProducibleMessage();
message.setBody({ hello: 'world' }).setQueue(queue);
const { messages } = await producer.produceAsync(message);
return { producer, messageId: messages[0], queue };
const [messageId] = await producer.produceAsync(message);
return { producer, messageId, queue };
}

export async function produceMessageWithPriority(
Expand All @@ -82,20 +82,20 @@ export async function produceMessageWithPriority(
const producer = getProducer();
await producer.runAsync();

const message = new MessageEnvelope();
const message = new ProducibleMessage();
message.setPriority(EMessagePriority.LOW).setQueue(queue);
const { messages } = await producer.produceAsync(message);
return { messageId: messages[0], producer, queue };
const [messageId] = await producer.produceAsync(message);
return { messageId, producer, queue };
}

export async function scheduleMessage(queue: IQueueParams = defaultQueue) {
const producer = getProducer();
await producer.runAsync();

const message = new MessageEnvelope();
const message = new ProducibleMessage();
message.setScheduledDelay(10000).setQueue(queue);
const { messages } = await producer.produceAsync(message);
return { messageId: messages[0], producer, queue };
const [messageId] = await producer.produceAsync(message);
return { messageId, producer, queue };
}

export async function createQueue(
Expand Down
4 changes: 2 additions & 2 deletions tests/common/start-up.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import { getRedisInstance } from './redis';
import { logger } from 'redis-smq-common';
import { Configuration } from '../../src/config/configuration';
import { config } from './config';
import { MessageEnvelope } from '../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../src/lib/message/producible-message';

export async function startUp(): Promise<void> {
Configuration.reset();
Configuration.getSetConfig(config);
MessageEnvelope.setDefaultConsumeOptions({
ProducibleMessage.setDefaultConsumeOptions({
ttl: 0,
retryThreshold: 3,
retryDelay: 0,
Expand Down
16 changes: 4 additions & 12 deletions tests/tests/consuming-messages/test00002.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../index';
import { MessageState } from '../../../src/lib/message/message-state';
import { ProducibleMessage } from '../../../index';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -26,18 +25,11 @@ test('Produce and consume 1 message', async () => {
messageHandler: (msg1, cb) => cb(),
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

expect(msg.getMessageState()).toBe(null);
expect(msg.getId()).toBe(null);

await producer.produceAsync(msg);

expect((msg.getMessageState() ?? {}) instanceof MessageState).toBe(true);
expect(typeof msg.getId() === 'string').toBe(true);

const [messageId] = await producer.produceAsync(msg);
consumer.run();

await untilMessageAcknowledged(consumer, msg.getRequiredId());
await untilMessageAcknowledged(consumer, messageId);
});
18 changes: 8 additions & 10 deletions tests/tests/consuming-messages/test00003.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../index';
import { ProducibleMessage } from '../../../index';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -24,18 +24,18 @@ test('Produce and consume 100 message: LIFO Queues', async () => {
await producer.runAsync();

const total = 100;
const publishedMsg: MessageEnvelope[] = [];
const publishedMsg: string[] = [];
for (let i = 0; i < total; i += 1) {
const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);
await producer.produceAsync(msg);
publishedMsg.push(msg);
const [id] = await producer.produceAsync(msg);
publishedMsg.push(id);
}

const deliveredMessages: MessageEnvelope[] = [];
const deliveredMessages: string[] = [];
const consumer = getConsumer({
messageHandler: (msg, cb) => {
deliveredMessages.push(msg);
deliveredMessages.push(msg.getId());
cb();
},
});
Expand All @@ -44,8 +44,6 @@ test('Produce and consume 100 message: LIFO Queues', async () => {

expect(deliveredMessages.length).toEqual(publishedMsg.length);
for (let i = 0; i < total; i += 1) {
expect(publishedMsg[i].getRequiredId()).toStrictEqual(
deliveredMessages[total - i - 1].getRequiredId(),
);
expect(publishedMsg[i]).toStrictEqual(deliveredMessages[total - i - 1]);
}
});
8 changes: 4 additions & 4 deletions tests/tests/consuming-messages/test00004.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { delay } from 'bluebird';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -32,10 +32,10 @@ test('A message is dead-lettered and not delivered when messageTTL is exceeded',
unacknowledged += 1;
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setTTL(3000).setQueue(defaultQueue);

await producer.produceAsync(msg);
const [id] = await producer.produceAsync(msg);
await delay(5000);
consumer.run();

Expand All @@ -49,5 +49,5 @@ test('A message is dead-lettered and not delivered when messageTTL is exceeded',
100,
);
expect(list.totalItems).toBe(1);
expect(list.items[0].getId()).toBe(msg.getRequiredId());
expect(list.items[0].getId()).toBe(id);
});
8 changes: 4 additions & 4 deletions tests/tests/consuming-messages/test00005.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { delay } from 'bluebird';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -31,10 +31,10 @@ test('Setting default message TTL from configuration', async () => {
consumer.on('messageUnacknowledged', () => {
unacks += 1;
});
const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue).setTTL(2000);

await producer.produceAsync(msg);
const [id] = await producer.produceAsync(msg);
await delay(5000);
consumer.run();

Expand All @@ -49,5 +49,5 @@ test('Setting default message TTL from configuration', async () => {
100,
);
expect(list.totalItems).toBe(1);
expect(list.items[0].getId()).toBe(msg.getRequiredId());
expect(list.items[0].getId()).toBe(id);
});
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00006.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { ICallback } from 'redis-smq-common';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand All @@ -33,7 +33,7 @@ test('A message is unacknowledged when messageConsumeTimeout is exceeded', async
}),
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg
.setBody({ hello: 'world' })
.setQueue(defaultQueue)
Expand Down
7 changes: 4 additions & 3 deletions tests/tests/consuming-messages/test00007.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { ICallback } from 'redis-smq-common';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand All @@ -16,6 +16,7 @@ import {
createQueue,
defaultQueue,
} from '../../common/message-producing-consuming';
import { IConsumableMessage } from '../../../types';

test('Unacknowledged message are re-queued when messageRetryThreshold is not exceeded', async () => {
const producer = getProducer();
Expand All @@ -25,7 +26,7 @@ test('Unacknowledged message are re-queued when messageRetryThreshold is not exc

let callCount = 0;
const consumer = getConsumer({
messageHandler: jest.fn((msg: MessageEnvelope, cb: ICallback<void>) => {
messageHandler: jest.fn((msg: IConsumableMessage, cb: ICallback<void>) => {
callCount += 1;
if (callCount === 1) throw new Error('Explicit error');
else if (callCount === 2) cb();
Expand All @@ -43,7 +44,7 @@ test('Unacknowledged message are re-queued when messageRetryThreshold is not exc
acknowledged += 1;
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

await producer.produceAsync(msg);
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00008.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -45,7 +45,7 @@ test('Async exceptions are caught when consuming a message', async () => {
acknowledged += 1;
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

await producer.produceAsync(msg);
Expand Down
8 changes: 4 additions & 4 deletions tests/tests/consuming-messages/test00009.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -33,10 +33,10 @@ test('A message is dead-lettered when messageRetryThreshold is exceeded', async
unacknowledged += 1;
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

await producer.produceAsync(msg);
const [id] = await producer.produceAsync(msg);
consumer.run();

await delay(30000);
Expand All @@ -48,5 +48,5 @@ test('A message is dead-lettered when messageRetryThreshold is exceeded', async
100,
);
expect(list.totalItems).toBe(1);
expect(list.items[0].getId()).toBe(msg.getRequiredId());
expect(list.items[0].getId()).toBe(id);
});
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00011.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { delay } from 'bluebird';
import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
import {
Expand Down Expand Up @@ -58,7 +58,7 @@ test('Given many consumers, a message is delivered only to one consumer', async
/**
*
*/
const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg.setBody({ hello: 'world' }).setQueue(defaultQueue);

const producer = getProducer();
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00012.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* in the root directory of this source tree.
*/

import { MessageEnvelope } from '../../../src/lib/message/message-envelope';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -44,7 +44,7 @@ test('An unacknowledged message is delayed given messageRetryDelay > 0 and messa
acks += 1;
});

const msg = new MessageEnvelope();
const msg = new ProducibleMessage();
msg
.setBody({ hello: 'world' })
.setQueue(defaultQueue)
Expand Down
Loading

0 comments on commit 2961a22

Please sign in to comment.