Skip to content

Commit

Permalink
add deadLetter event, add group to error event signature
Browse files Browse the repository at this point in the history
  • Loading branch information
morris committed Sep 17, 2023
1 parent 849bde4 commit e8372b1
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 16 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## NEXT

- Add `group` to consumer event callback signature
- Add `deadLetter` event to handle permanently failed messages

## 1.0.0

- BREAKING CHANGES
Expand Down
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,14 @@ await messageQueue.publish({ type: "input", data: "world" });

## Usage

(See [API documentation](https://morris.github.io/mongomq2/)
for a detailed reference of all configuration and functionalities.)

### Setup

```ts
const messageQueue = new MessageQueue(collection);

const messageQueue = new MessageQueue(collection, {
filter: {
// optional global filter applied on all subscribers and consumers
},
});
const messageCollection = mongoClient.db().collection<MyMessage>("messages");
const messageQueue = new MessageQueue(messageCollection);
```

### Publishing
Expand Down Expand Up @@ -161,6 +159,10 @@ messageQueue.consume(
},
},
);

messageQueue.on("deadLetter", (err, message, group) => {
// handle dead letter, i.e. message that failed repeatedly and exhausted maxRetries
});
```

- Consumes future and past matching messages.
Expand Down Expand Up @@ -211,7 +213,7 @@ Useful for:
### Additional Notes

- All MongoMQ2 clients are `EventEmitters`.
- Always attach `.on('error', (err, message?) => /* report error */)` to monitor errors.
- Always attach `.on('error', (err, message?, group?) => /* report error */)` to monitor errors.
- Always `.close()` MongoMQ2 clients on shutdown (before closing the MongoClient).
- MongoMQ2 will try to finish open tasks with best effort.
- MongoDB change streams are only supported for MongoDB replica sets.
Expand Down
26 changes: 23 additions & 3 deletions src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,26 @@ export interface ConsumerOptions<TMessage extends WithOptionalObjectId> {

export interface ConsumerEvents<TMessage extends WithOptionalObjectId>
extends ErrorEvents<TMessage> {
drained: () => void;
deadLetter: (err: Error, message: WithId<TMessage>, group: string) => void;
drained: (group: string) => void;
}

export type ConsumerCallback<TMessage extends WithOptionalObjectId> = (
message: WithId<TMessage>,
) => void | Promise<void>;

export interface ConsumerMetadata {
_c?: Record<
string,
| {
v?: number;
r?: number;
a?: number;
}
| undefined
>;
}

export class Consumer<
TMessage extends WithOptionalObjectId,
> extends TypedEventEmitter<ConsumerEvents<TMessage>> {
Expand Down Expand Up @@ -195,10 +208,17 @@ export class Consumer<
// fast poll after successfully consumed message
this.nextTimeout.set(0, this.fastPollMs);
} catch (err) {
this.emit("error", err as Error, message as TMessage);
this.emit("error", err as Error, message as TMessage, this.group);

const metadata = message as ConsumerMetadata;
const retries = metadata._c?.[this.group]?.r ?? 0;

if (retries >= this.maxRetries) {
this.emit("deadLetter", err as Error, message, this.group);
}
}
} else {
this.emit("drained");
this.emit("drained", this.group);
}
});
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/ErrorEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import type { Document, EventsDescription } from "mongodb";

export interface ErrorEvents<TMessage extends Document>
extends EventsDescription {
error: (err: Error, message?: TMessage) => void;
error: (err: Error, message?: TMessage, group?: string) => void;
}
11 changes: 8 additions & 3 deletions src/MessageQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Collection, Filter, OptionalUnlessRequiredId } from "mongodb";
import { Collection, Filter, OptionalUnlessRequiredId, WithId } from "mongodb";
import {
BatchPublisher,
BatchPublisherEvents,
Expand Down Expand Up @@ -95,8 +95,13 @@ export class MessageQueue<
},
);

consumer.on("error", (err, message) => this.emit("error", err, message));
consumer.on("drained", () => this.emit("drained"));
consumer.on("error", (err, message, group) =>
this.emit("error", err, message, group),
);
consumer.on("deadLetter", (err, message, group) =>
this.emit("deadLetter", err, message as WithId<TMessage>, group),
);
consumer.on("drained", (group) => this.emit("drained", group));

this.consumers.push(consumer as unknown as Consumer<TMessage>);

Expand Down
65 changes: 65 additions & 0 deletions test/Consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Publisher } from "../src";
import {
NumericTestMessage,
TestFailure,
TestMessage,
TestUtil,
TextTestMessage,
} from "./testUtil";
Expand Down Expand Up @@ -323,4 +324,68 @@ describe("A Consumer", () => {
{ value: "should be consumed 3" },
]);
});

it("should emit deadLetter events when retries are exhausted", async () => {
const group = "testGroup";
const consumed: TestMessage[] = [];
let errors = 0;
const deadLetters: TestMessage[] = [];

const consumer = util.createConsumer(
(message) => {
if (message.value === "fail") {
throw new TestFailure("always fails");
}

consumed.push(message);
},
{
group,
maxRetries: 3,
pollMs: 10,
fastPollMs: 10,
visibilityTimeoutSeconds: 1,
},
);

consumer.on("error", () => ++errors);

consumer.on("deadLetter", (err, message) => {
deadLetters.push(message);
});

const publisher = util.createPublisher();

await publisher.publish({
type: "text",
value: "ok1",
});

await publisher.publish({
type: "text",
value: "ok2",
});

await publisher.publish({
type: "text",
value: "fail",
});

await publisher.publish({
type: "text",
value: "ok3",
});

await new Promise((resolve) => setTimeout(resolve, 5000));

expect(consumed).toMatchObject([
{ value: "ok1" },
{ value: "ok2" },
{ value: "ok3" },
]);

expect(errors).toBe(4); // 1 initial try + 3 retries
expect(deadLetters.length).toBe(1);
expect(deadLetters[0]).toMatchObject({ value: "fail" });
}, 300000);
});
10 changes: 9 additions & 1 deletion test/testUtil.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Collection, Filter, MongoClient } from "mongodb";
import { Collection, Filter, MongoClient, WithId } from "mongodb";
import {
BatchPublisher,
BatchPublisherOptions,
Expand Down Expand Up @@ -36,6 +36,7 @@ export class TestUtil {
public readonly subscribers: Subscriber<TestMessage>[] = [];
public readonly consumers: Consumer<TestMessage>[] = [];
public readonly emittedErrors: Error[] = [];
public readonly deadLetters: WithId<TestMessage>[] = [];

constructor(env: Record<string, string | undefined>) {
this.mongoClient = new MongoClient(
Expand Down Expand Up @@ -72,12 +73,14 @@ export class TestUtil {
// invariants
try {
expect(this.emittedErrors).toEqual([]);
expect(this.deadLetters).toEqual([]);
} finally {
this.publishers.length = 0;
this.batchPublishers.length = 0;
this.subscribers.length = 0;
this.consumers.length = 0;
this.emittedErrors.length = 0;
this.deadLetters.length = 0;
}
});

Expand Down Expand Up @@ -135,6 +138,11 @@ export class TestUtil {
this.emittedErrors.push(err);
});

consumer.on("deadLetter", (err, message) => {
if (err instanceof TestFailure) return;
this.deadLetters.push(message as WithId<TestMessage>);
});

consumer.start();

this.consumers.push(consumer as unknown as Consumer<TestMessage>);
Expand Down

0 comments on commit e8372b1

Please sign in to comment.