Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MET-32 Update docs #149

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 20 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ It consists of the following submodules:

### Publishers

`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol.
`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocols.
They implement the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `messageSchemas` – the `zod` schemas for all supported messages;
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`;
* `deletionConfig` - automatic cleanup of resources;
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
Expand All @@ -42,7 +45,7 @@ They implement the following public methods:

### Consumers

`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.
`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocols.
They expose the following public methods:

Multi-schema consumers support multiple message types via handler configs. They expose the following public methods:
Expand All @@ -58,17 +61,19 @@ Multi-schema consumers support multiple message types via handler configs. They
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist.
* `deletionConfig` - automatic cleanup of resources;
* `consumerOverrides` – available only for SQS consumers;
* `deadLetterQueue` - available only for SQS and SNS consumers; please read the section below to understand how to use it.
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
* `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information);
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `start()`, which invokes `init()`.

> **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts) for a practical example.


##### How to define a handler
#### Handlers

You can define handlers for each of the supported messages in a type-safe way using the MessageHandlerConfigBuilder.

Expand All @@ -80,9 +85,10 @@ type ExecutionContext = {
userService: UserService
}

export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
export class SqsPermissionConsumer extends AbstractSqsConsumer<
SupportedMessages,
ExecutionContext
ExecutionContext,
PreHandlerContext
> {
constructor(
dependencies: SQSConsumerDependencies,
Expand All @@ -94,7 +100,8 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
//
handlers: new MessageHandlerConfigBuilder<
SupportedMessages,
ExecutionContext
ExecutionContext,
PreHandlerContext
>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
Expand Down Expand Up @@ -149,7 +156,7 @@ Then the message is automatically nacked without requeueing by the abstract cons

> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumer.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumer.spec.ts) for a practical example.

### Barrier pattern
### Barrier Pattern
The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not yet in the proper state to be able to process that message (e. g. some prerequisite messages have not yet arrived).

To enable this pattern you should define `preHandlerBarrier` on your message handler in order to define the conditions for starting to process the message.
Expand Down Expand Up @@ -182,9 +189,9 @@ Both publishers and consumers accept a queue name and configuration as parameter

If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist.

## Handler spies
## Handler Spies

In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly.
In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into `message-queue-toolkit` directly.

In order to enable this functionality, configure spyHandler on the publisher or consumer:

Expand Down Expand Up @@ -235,51 +242,3 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
expect(result.processingResult).toEqual('consumed')
```

## Automatic Reconnects (RabbitMQ)

`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.

Example:

```ts
export const TEST_AMQP_CONFIG: AmqpConfig = {
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
}

const amqpConnectionManager = new AmqpConnectionManager(config, logger)
await amqpConnectionManager.init()

const publisher = new TestAmqpPublisher(
{ amqpConnectionManager },
{
/// other amqp options
})
await publisher.init()

const consumer = new TestAmqpConsumer(
{ amqpConnectionManager },
{
/// other amqp options
})
await consumer.start()

// break connection, to simulate unexpected disconnection in production
await (await amqpConnectionManager.getConnection()).close()

const message = {
// some test message
}

// This will fail, but will trigger reconnection within amqpConnectionManager
publisher.publish(message)

// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
```
67 changes: 67 additions & 0 deletions packages/amqp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# AMQP (Advanced Message Queuing Protocol)

dariacm marked this conversation as resolved.
Show resolved Hide resolved
The library provides support for both direct exchanges and topic exchanges.

> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation.
>

## Publishers

Use `AbstractAmqpQueuePublisher` to implement direct exchange and `AbstractAmqpTopicPublisher` for topic exchange.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to document out PublisherManager functionality, but that's a separate big topic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a separate ticket to not stall this one -> https://lokalise.atlassian.net/browse/MET-53


See [test publisher](test/publishers/AmqpPermissionPublisher.ts) for an example of implementation.

## Consumers

Use `AbstractAmqpQueueConsumer` to implement direct exchange and `AbstractAmqpTopicConsumer` for topic exchange.

See [test consumer](test/consumers/AmqpPermissionConsumer.ts) for an example of implementation.

## Automatic Reconnects

`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.

Example:

```ts
export const TEST_AMQP_CONFIG: AmqpConfig = {
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
}

const amqpConnectionManager = new AmqpConnectionManager(config, logger)
await amqpConnectionManager.init()

const publisher = new TestAmqpPublisher(
{ amqpConnectionManager },
{
// other amqp options
})
await publisher.init()

const consumer = new TestAmqpConsumer(
{ amqpConnectionManager },
{
// other amqp options
})
await consumer.start()

// break connection, to simulate unexpected disconnection in production
await (await amqpConnectionManager.getConnection()).close()

const message = {
// some test message
}

// This will fail, but will trigger reconnection within amqpConnectionManager
publisher.publish(message)

// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
```
Loading