Skip to content

Commit

Permalink
tests: add Pub/Sub delivery model, refactor and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Jan 25, 2024
1 parent 32db9dd commit 11e2542
Show file tree
Hide file tree
Showing 33 changed files with 1,033 additions and 83 deletions.
1 change: 1 addition & 0 deletions tests/common/consumer-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ producer.run((err) => {
.setBody(123)
.setRetryDelay(0),
(err) => {
console.log('GGGGGGG', err);
if (err) throw err;
},
);
Expand Down
13 changes: 11 additions & 2 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
* in the root directory of this source tree.
*/

import { EMessagePriority, EQueueType, IQueueParams } from '../../types';
import {
EMessagePriority,
EQueueDeliveryModel,
EQueueType,
IQueueParams,
} from '../../types';
import { ProducibleMessage } from '../../src/lib/message/producible-message';
import { untilConsumerEvent, untilMessageAcknowledged } from './events';
import { getConsumer } from './consumer';
Expand Down Expand Up @@ -109,7 +114,11 @@ export async function createQueue(
? EQueueType.PRIORITY_QUEUE
: EQueueType.LIFO_QUEUE
: mixed;
await queueInstance.saveAsync(queue, type);
await queueInstance.saveAsync(
queue,
type,
EQueueDeliveryModel.POINT_TO_POINT,
);
}

export async function crashAConsumerConsumingAMessage() {
Expand Down
2 changes: 1 addition & 1 deletion tests/common/queue-pending-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { promisifyAll } from 'bluebird';
import { QueuePendingMessages } from '../../src/lib/queue/queue-pending-messages';
import { QueuePendingMessages } from '../../src/lib/queue/queue-pending-messages/queue-pending-messages';

export async function getQueuePendingMessages() {
return promisifyAll(new QueuePendingMessages());
Expand Down
2 changes: 1 addition & 1 deletion tests/common/queue-rate-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

import { promisifyAll } from 'bluebird';
import { QueueRateLimit } from '../../src/lib/queue/queue-rate-limit';
import { QueueRateLimit } from '../../src/lib/queue/queue-rate-limit/queue-rate-limit';

export async function getQueueRateLimit() {
return promisifyAll(new QueueRateLimit());
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/consuming-messages/test00010.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test('A message is not lost in case of a consumer crash', async () => {
await crashAConsumerConsumingAMessage();

/**
* Consumer2 re-queues failed message and consume it!
* Consumer2 re-queues failed message and consumes it!
*/
const consumer2 = getConsumer({
messageHandler: jest.fn((msg, cb) => {
Expand Down
46 changes: 31 additions & 15 deletions tests/tests/consuming-messages/test00015.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import { promisifyAll } from 'bluebird';
import { Consumer } from '../../../src/lib/consumer/consumer';
import { shutDownBaseInstance } from '../../common/base-instance';
import { EQueueType } from '../../../types';
import { EQueueDeliveryModel, EQueueType } from '../../../types';
import { getQueue } from '../../common/queue';

test('Consume message from different queues using a single consumer instance: case 1', async () => {
Expand All @@ -19,10 +19,18 @@ test('Consume message from different queues using a single consumer instance: ca

expect(consumer.getQueues()).toEqual([]);

await queueInstance.saveAsync('test_queue', EQueueType.LIFO_QUEUE);
await queueInstance.saveAsync(
'test_queue',
EQueueType.LIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test_queue', (msg, cb) => cb());

await queueInstance.saveAsync('another_queue', EQueueType.LIFO_QUEUE);
await queueInstance.saveAsync(
'another_queue',
EQueueType.LIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('another_queue', (msg, cb) => cb());

expect(
Expand All @@ -32,19 +40,21 @@ test('Consume message from different queues using a single consumer instance: ca
);

expect(consumer.getQueues()).toEqual([
{ name: 'test_queue', ns: 'testing' },
{ name: 'another_queue', ns: 'testing' },
{ queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null },
{ queueParams: { name: 'another_queue', ns: 'testing' }, groupId: null },
]);

await consumer.cancelAsync('another_queue');

expect(consumer.getQueues()).toEqual([{ name: 'test_queue', ns: 'testing' }]);
expect(consumer.getQueues()).toEqual([
{ queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null },
]);

await consumer.consumeAsync('another_queue', (msg, cb) => cb());

expect(consumer.getQueues()).toEqual([
{ name: 'test_queue', ns: 'testing' },
{ name: 'another_queue', ns: 'testing' },
{ queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null },
{ queueParams: { name: 'another_queue', ns: 'testing' }, groupId: null },
]);

const res = await consumer.runAsync();
Expand All @@ -61,26 +71,32 @@ test('Consume message from different queues using a single consumer instance: ca
// does not throw an error
await consumer.cancelAsync('another_queue');

expect(consumer.getQueues()).toEqual([{ name: 'test_queue', ns: 'testing' }]);
expect(consumer.getQueues()).toEqual([
{ queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null },
]);

await consumer.consumeAsync('another_queue', (msg, cb) => cb());

expect(consumer.getQueues()).toEqual([
{ name: 'test_queue', ns: 'testing' },
{ name: 'another_queue', ns: 'testing' },
{ queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null },
{ queueParams: { name: 'another_queue', ns: 'testing' }, groupId: null },
]);

await queueInstance.saveAsync('queue_a', EQueueType.PRIORITY_QUEUE);
await queueInstance.saveAsync(
'queue_a',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('queue_a', (msg, cb) => cb());

expect(consumer.consumeAsync('queue_a', (msg, cb) => cb())).rejects.toThrow(
`A message handler for queue [queue_a@testing] already exists`,
);

expect(consumer.getQueues()).toEqual([
{ name: 'test_queue', ns: 'testing' },
{ name: 'another_queue', ns: 'testing' },
{ name: 'queue_a', ns: 'testing' },
{ queueParams: { name: 'test_queue', ns: 'testing' }, groupId: null },
{ queueParams: { name: 'another_queue', ns: 'testing' }, groupId: null },
{ queueParams: { name: 'queue_a', ns: 'testing' }, groupId: null },
]);

await shutDownBaseInstance(consumer);
Expand Down
8 changes: 5 additions & 3 deletions tests/tests/consuming-messages/test00020.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
* in the root directory of this source tree.
*/

import { ProducibleMessage } from '../../../index';
import {
ProducerMessageExchangeRequiredError,
ProducibleMessage,
} from '../../../index';
import { getProducer } from '../../common/producer';
import { MessageExchangeRequiredError } from '../../../src/lib/message/errors';

test('Producing a message without a message queue', async () => {
const producer = getProducer();
Expand All @@ -20,5 +22,5 @@ test('Producing a message without a message queue', async () => {

await expect(async () => {
await producer.produceAsync(msg);
}).rejects.toThrow(MessageExchangeRequiredError);
}).rejects.toThrow(ProducerMessageExchangeRequiredError);
});
8 changes: 6 additions & 2 deletions tests/tests/consuming-messages/test00032.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import { consumerQueues } from '../../../src/lib/consumer/consumer-queues';
import { getRedisInstance } from '../../common/redis';
import { defaultQueue } from '../../common/message-producing-consuming';
import { shutDownBaseInstance } from '../../common/base-instance';
import { EQueueType } from '../../../types';
import { EQueueDeliveryModel, EQueueType } from '../../../types';
import { getQueue } from '../../common/queue';

test('Consume message from different queues using a single consumer instance: case 3', async () => {
const queue = await getQueue();
await queue.saveAsync(defaultQueue, EQueueType.LIFO_QUEUE);
await queue.saveAsync(
defaultQueue,
EQueueType.LIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);

const consumer = promisifyAll(new Consumer());
await consumer.runAsync();
Expand Down
6 changes: 3 additions & 3 deletions tests/tests/consuming-messages/test00033.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test('Consume message from different queues using a single consumer instance: ca
});

const queues = consumer.getQueues();
expect(queues.map((i) => i.name)).toEqual([
expect(queues.map((i) => i.queueParams.name)).toEqual([
'test1',
'test2',
'test3',
Expand Down Expand Up @@ -86,7 +86,7 @@ test('Consume message from different queues using a single consumer instance: ca
]);

await consumer.cancelAsync('test4');
expect(consumer.getQueues().map((i) => i.name)).toEqual([
expect(consumer.getQueues().map((i) => i.queueParams.name)).toEqual([
'test1',
'test2',
'test3',
Expand All @@ -109,7 +109,7 @@ test('Consume message from different queues using a single consumer instance: ca
'test5',
'test6',
]);
expect(consumer.getQueues().map((i) => i.name)).toEqual([
expect(consumer.getQueues().map((i) => i.queueParams.name)).toEqual([
'test1',
'test2',
'test3',
Expand Down
42 changes: 35 additions & 7 deletions tests/tests/consuming-messages/test00035.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import { Consumer } from '../../../src/lib/consumer/consumer';
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { getProducer } from '../../common/producer';
import { shutDownBaseInstance } from '../../common/base-instance';
import { EQueueType, IConsumableMessage } from '../../../types';
import {
EQueueDeliveryModel,
EQueueType,
IConsumableMessage,
} from '../../../types';
import { getQueue } from '../../common/queue';

test('Consume message from different queues using a single consumer instance: case 6', async () => {
Expand All @@ -24,7 +28,11 @@ test('Consume message from different queues using a single consumer instance: ca
await delay(5000);

const queue = await getQueue();
await queue.saveAsync('test0', EQueueType.LIFO_QUEUE);
await queue.saveAsync(
'test0',
EQueueType.LIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test0', () => void 0);

consumer.once('messageReceived', () => {
Expand All @@ -46,31 +54,51 @@ test('Consume message from different queues using a single consumer instance: ca
await delay(10000);
expect(consumer.getQueues()).toEqual([]);

await queue.saveAsync('test1', EQueueType.PRIORITY_QUEUE);
await queue.saveAsync(
'test1',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test1', (msg, cb) => {
messages.push(msg);
cb();
});

await queue.saveAsync('test2', EQueueType.PRIORITY_QUEUE);
await queue.saveAsync(
'test2',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test2', (msg, cb) => {
messages.push(msg);
cb();
});

await queue.saveAsync('test3', EQueueType.PRIORITY_QUEUE);
await queue.saveAsync(
'test3',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test3', (msg, cb) => {
messages.push(msg);
cb();
});

await queue.saveAsync('test4', EQueueType.PRIORITY_QUEUE);
await queue.saveAsync(
'test4',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test4', (msg, cb) => {
messages.push(msg);
cb();
});

await queue.saveAsync('test5', EQueueType.PRIORITY_QUEUE);
await queue.saveAsync(
'test5',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await consumer.consumeAsync('test5', (msg, cb) => {
messages.push(msg);
cb();
Expand Down
18 changes: 15 additions & 3 deletions tests/tests/consuming-messages/test00036.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@
import { ProducibleMessage } from '../../../src/lib/message/producible-message';
import { ProducerMessageNotPublishedError } from '../../../src/lib/producer/errors';
import { getProducer } from '../../common/producer';
import { EMessagePriority, EQueueType } from '../../../types';
import {
EMessagePriority,
EQueueDeliveryModel,
EQueueType,
} from '../../../types';
import { getQueue } from '../../common/queue';

test('Producing a message and expecting different kind of failures', async () => {
const queue = await getQueue();
await queue.saveAsync('test0', EQueueType.LIFO_QUEUE);
await queue.saveAsync('test1', EQueueType.PRIORITY_QUEUE);
await queue.saveAsync(
'test0',
EQueueType.LIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);
await queue.saveAsync(
'test1',
EQueueType.PRIORITY_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
);

const producer = getProducer();
await producer.runAsync();
Expand Down
3 changes: 3 additions & 0 deletions tests/tests/consuming-messages/test00039.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ test('ConsumableMessage', async () => {
expect(consumableMessage.getScheduledDelay()).toBe(null);
expect(consumableMessage.getFanOut()).toBe(null);
expect(consumableMessage.getTopic()).toBe(null);
expect(consumableMessage.getTopic()).toBe(null);
expect(consumableMessage.getConsumerGroupId()).toBe(null);
expect(consumableMessage.toJSON()).toEqual({
createdAt: consumableMessage.getCreatedAt(),
exchange: consumableMessage.getExchange().toJSON(),
Expand All @@ -78,5 +80,6 @@ test('ConsumableMessage', async () => {
scheduledRepeatPeriod: consumableMessage.getScheduledRepeatPeriod(),
scheduledRepeat: consumableMessage.getScheduledRepeat(),
destinationQueue: consumableMessage.getDestinationQueue(),
consumerGroupId: consumableMessage.getConsumerGroupId(),
});
});
2 changes: 1 addition & 1 deletion tests/tests/deleting-messages/test00004.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { getQueuePendingMessages } from '../../common/queue-pending-messages';
import { getQueueAcknowledgedMessages } from '../../common/queue-acknowledged-messages';
import { getQueueMessages } from '../../common/queue-messages';
import { promisifyAll } from 'bluebird';
import { MessageNotFoundError } from '../../../src/lib/message/errors/message-not-found.error';
import { MessageNotFoundError } from '../../../src/lib/message/errors';
import { Message } from '../../../src/lib/message/message';

test('Combined test: Delete a dead-letter message. Check pending, acknowledged, and dead-letter message. Check queue metrics.', async () => {
Expand Down
Loading

0 comments on commit 11e2542

Please sign in to comment.