Skip to content

Commit

Permalink
test: use typed events, remove legacy events
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Nov 26, 2023
1 parent 5f5a34f commit 5d85cb0
Show file tree
Hide file tree
Showing 25 changed files with 60 additions and 103 deletions.
9 changes: 4 additions & 5 deletions tests/common/base-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@
* in the root directory of this source tree.
*/

import { events } from '../../src/common/events/events';
import { Base } from '../../src/lib/base';

export async function shutDownBaseInstance(i: Base): Promise<void> {
if (i.isGoingUp()) {
await new Promise((resolve) => {
i.once(events.UP, resolve);
await new Promise<void>((resolve) => {
i.once('up', resolve);
});
}
if (i.isRunning()) {
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
i.shutdown(resolve);
await new Promise<void>((resolve) => {
i.shutdown(() => resolve());
});
}
}
13 changes: 5 additions & 8 deletions tests/common/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
*/

import { Consumer } from '../../src/lib/consumer/consumer';
import { events } from '../../src/common/events/events';
import { TRedisSMQEvent } from '../../types';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export async function consumerOnEvent<T extends Array<any>>(
consumer: Consumer,
event: string,
event: keyof TRedisSMQEvent,
) {
return new Promise<T>((resolve) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
Expand All @@ -28,10 +28,7 @@ export async function untilMessageAcknowledged(
consumer: Consumer,
messageId?: string,
): Promise<void> {
const [id] = await consumerOnEvent<[string]>(
consumer,
events.MESSAGE_ACKNOWLEDGED,
);
const [id] = await consumerOnEvent<[string]>(consumer, 'messageAcknowledged');
if (messageId && messageId !== id) {
await untilMessageAcknowledged(consumer, messageId);
}
Expand All @@ -43,7 +40,7 @@ export async function untilMessageDeadLettered(
): Promise<void> {
const [, id] = await consumerOnEvent<[string, string]>(
consumer,
events.MESSAGE_DEAD_LETTERED,
'messageDeadLettered',
);
if (messageId && messageId !== id) {
await untilMessageDeadLettered(consumer, messageId);
Expand All @@ -52,7 +49,7 @@ export async function untilMessageDeadLettered(

export async function untilConsumerEvent(
consumer: Consumer,
event: string,
event: keyof TRedisSMQEvent,
): Promise<unknown[]> {
return consumerOnEvent(consumer, event);
}
3 changes: 1 addition & 2 deletions tests/common/message-producing-consuming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import { EQueueType, IQueueParams } from '../../types';
import { Message } from '../../src/lib/message/message';
import { events } from '../../src/common/events/events';
import { untilConsumerEvent, untilMessageAcknowledged } from './events';
import { getConsumer } from './consumer';
import { getProducer } from './producer';
Expand Down Expand Up @@ -63,7 +62,7 @@ export async function produceAndDeadLetterMessage(
const { messages } = await producer.produceAsync(message);

consumer.run();
await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED);
await untilConsumerEvent(consumer, 'messageDeadLettered');
return { producer, consumer, messageId: messages[0], queue };
}

Expand Down
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00004.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import { delay } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -29,7 +28,7 @@ test('A message is dead-lettered and not delivered when messageTTL is exceeded',
const consume = jest.spyOn(consumer, 'consume');

let unacknowledged = 0;
consumer.on(events.MESSAGE_UNACKNOWLEDGED, () => {
consumer.on('messageUnacknowledged', () => {
unacknowledged += 1;
});

Expand All @@ -40,7 +39,7 @@ test('A message is dead-lettered and not delivered when messageTTL is exceeded',
await delay(5000);
consumer.run();

await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED);
await untilConsumerEvent(consumer, 'messageDeadLettered');
expect(consume).toHaveBeenCalledTimes(0);
expect(unacknowledged).toBe(1);
const deadLetteredMessages = await getQueueDeadLetteredMessages();
Expand Down
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00005.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import { delay } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -29,7 +28,7 @@ test('Setting default message TTL from configuration', async () => {
const consume = jest.spyOn(consumer, 'consume');

let unacks = 0;
consumer.on(events.MESSAGE_UNACKNOWLEDGED, () => {
consumer.on('messageUnacknowledged', () => {
unacks += 1;
});
const msg = new Message();
Expand All @@ -39,7 +38,7 @@ test('Setting default message TTL from configuration', async () => {
await delay(5000);
consumer.run();

await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED);
await untilConsumerEvent(consumer, 'messageDeadLettered');

expect(consume).toHaveBeenCalledTimes(0);
expect(unacks).toBe(1);
Expand Down
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00006.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { ICallback } from 'redis-smq-common';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand Down Expand Up @@ -43,6 +42,6 @@ test('A message is unacknowledged when messageConsumeTimeout is exceeded', async

await producer.produceAsync(msg);
consumer.run();
await untilConsumerEvent(consumer, events.MESSAGE_UNACKNOWLEDGED);
await untilConsumerEvent(consumer, events.MESSAGE_ACKNOWLEDGED);
await untilConsumerEvent(consumer, 'messageUnacknowledged');
await untilConsumerEvent(consumer, 'messageAcknowledged');
});
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00007.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { ICallback } from 'redis-smq-common';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
Expand All @@ -35,12 +34,12 @@ test('Unacknowledged message are re-queued when messageRetryThreshold is not exc
});

let unacknowledged = 0;
consumer.on(events.MESSAGE_UNACKNOWLEDGED, () => {
consumer.on('messageUnacknowledged', () => {
unacknowledged += 1;
});

let acknowledged = 0;
consumer.on(events.MESSAGE_ACKNOWLEDGED, () => {
consumer.on('messageAcknowledged', () => {
acknowledged += 1;
});

Expand Down
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00008.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -37,12 +36,12 @@ test('Async exceptions are caught when consuming a message', async () => {
});

let unacknowledged = 0;
consumer.on(events.MESSAGE_UNACKNOWLEDGED, () => {
consumer.on('messageUnacknowledged', () => {
unacknowledged += 1;
});

let acknowledged = 0;
consumer.on(events.MESSAGE_ACKNOWLEDGED, () => {
consumer.on('messageAcknowledged', () => {
acknowledged += 1;
});

Expand Down
3 changes: 1 addition & 2 deletions tests/tests/consuming-messages/test00009.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { delay } from 'bluebird';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand All @@ -30,7 +29,7 @@ test('A message is dead-lettered when messageRetryThreshold is exceeded', async
});

let unacknowledged = 0;
consumer.on(events.MESSAGE_UNACKNOWLEDGED, () => {
consumer.on('messageUnacknowledged', () => {
unacknowledged += 1;
});

Expand Down
9 changes: 4 additions & 5 deletions tests/tests/consuming-messages/test00011.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import { delay } from 'bluebird';
import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
import {
Expand All @@ -28,10 +27,10 @@ test('Given many consumers, a message is delivered only to one consumer', async
let unacks1 = 0;
let acks1 = 0;
consumer1
.on(events.MESSAGE_UNACKNOWLEDGED, () => {
.on('messageUnacknowledged', () => {
unacks1 += 1;
})
.on(events.MESSAGE_ACKNOWLEDGED, () => {
.on('messageAcknowledged', () => {
acks1 += 1;
});

Expand All @@ -46,10 +45,10 @@ test('Given many consumers, a message is delivered only to one consumer', async
let unacks2 = 0;
let acks2 = 0;
consumer2
.on(events.MESSAGE_UNACKNOWLEDGED, () => {
.on('messageUnacknowledged', () => {
unacks2 += 1;
})
.on(events.MESSAGE_ACKNOWLEDGED, () => {
.on('messageAcknowledged', () => {
acks2 += 1;
});

Expand Down
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00012.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { untilMessageAcknowledged } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -36,12 +35,12 @@ test('An unacknowledged message is delayed given messageRetryDelay > 0 and messa
});

let unacks = 0;
consumer.on(events.MESSAGE_UNACKNOWLEDGED, () => {
consumer.on('messageUnacknowledged', () => {
unacks += 1;
});

let acks = 0;
consumer.on(events.MESSAGE_ACKNOWLEDGED, () => {
consumer.on('messageAcknowledged', () => {
acks += 1;
});

Expand Down
5 changes: 2 additions & 3 deletions tests/tests/consuming-messages/test00013.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import { Message } from '../../../src/lib/message/message';
import { delay } from 'bluebird';
import { events } from '../../../src/common/events/events';
import { ICallback } from 'redis-smq-common';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -44,7 +43,7 @@ test('Given many queues, a message is recovered from a consumer crash and re-que
cb();
},
});
queueAConsumer.on(events.MESSAGE_ACKNOWLEDGED, () => {
queueAConsumer.on('messageAcknowledged', () => {
defaultQueueMetrics.acks += 1;
});
await queueAConsumer.runAsync();
Expand All @@ -56,7 +55,7 @@ test('Given many queues, a message is recovered from a consumer crash and re-que
cb();
},
});
queueBConsumer.on(events.MESSAGE_ACKNOWLEDGED, () => {
queueBConsumer.on('messageAcknowledged', () => {
queueBMetrics.acks += 1;
});
await queueBConsumer.runAsync();
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/consuming-messages/test00014.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ test('Consume message from different queues and published by a single producer i
const consumer = getConsumer({
queue: `queUE_${i}`,
messageHandler: (msg, cb) => {
// message handlers start consuming message once started and before the consumer is fully started (when events.UP is emitted)
// untilMessageAcknowledged() may miss acknowledged events
// message handlers start consuming message once started and before the consumer is fully started (when event.UP is emitted)
// untilMessageAcknowledged() may miss acknowledged event
// As a workaround, adding a delay before acknowledging a message
setTimeout(cb, 10000);
},
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/consuming-messages/test00019.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../index';
import { events } from '../../../src/common/events/events';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -38,7 +37,7 @@ test('An unacknowledged message is dead-lettered and not delivered again, given
await producer.produceAsync(msg);

consumer.run();
await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED);
await untilConsumerEvent(consumer, 'messageDeadLettered');
const deadLetteredMessages = await getQueueDeadLetteredMessages();
const r = await deadLetteredMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(r.items.length).toBe(1);
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/consuming-messages/test00022.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { untilConsumerEvent } from '../../common/events';
import { getConsumer } from '../../common/consumer';
import { getProducer } from '../../common/producer';
Expand Down Expand Up @@ -36,7 +35,7 @@ test('Shutdown a consumer when consuming a message with retryThreshold = 0: expe
await producer.produceAsync(msg);

consumer.run();
await untilConsumerEvent(consumer, events.DOWN);
await untilConsumerEvent(consumer, 'down');
const deadLetteredMessages = await getQueueDeadLetteredMessages();
const res = await deadLetteredMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res.totalItems).toBe(1);
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/consuming-messages/test00023.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
} from '../../common/message-producing-consuming';
import { getQueueDeadLetteredMessages } from '../../common/queue-dead-lettered-messages';
import { untilConsumerEvent } from '../../common/events';
import { events } from '../../../src/common/events/events';

test('Messages produced from scheduled message are processed like normal message upon consume failures (retry, delay, requeue, etc)', async () => {
await createQueue(defaultQueue, false);
Expand All @@ -39,7 +38,7 @@ test('Messages produced from scheduled message are processed like normal message
await producer.produceAsync(msg);

consumer.run();
await untilConsumerEvent(consumer, events.MESSAGE_DEAD_LETTERED);
await untilConsumerEvent(consumer, 'messageDeadLettered');
const deadLetteredMessages = await getQueueDeadLetteredMessages();
const res = await deadLetteredMessages.getMessagesAsync(defaultQueue, 0, 100);
expect(res.totalItems).toBe(1);
Expand Down
3 changes: 1 addition & 2 deletions tests/tests/consuming-messages/test00035.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import { delay, promisifyAll } from 'bluebird';
import { Consumer } from '../../../src/lib/consumer/consumer';
import { Message } from '../../../src/lib/message/message';
import { events } from '../../../src/common/events/events';
import { getProducer } from '../../common/producer';
import { shutDownBaseInstance } from '../../common/base-instance';
import { EQueueType } from '../../../types';
Expand All @@ -28,7 +27,7 @@ test('Consume message from different queues using a single consumer instance: ca
await queue.saveAsync('test0', EQueueType.LIFO_QUEUE);
await consumer.consumeAsync('test0', () => void 0);

consumer.once(events.MESSAGE_RECEIVED, () => {
consumer.once('messageReceived', () => {
setTimeout(() => {
// cancelling a queue when a message handler is active
consumer.cancelAsync('test0').catch((e: unknown) => {
Expand Down

0 comments on commit 5d85cb0

Please sign in to comment.