Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add message queue integration #2491

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/docs/contributor/server/basics/queue.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
title: Message Queue
sidebar_position: 5
sidebar_custom_props:
icon: TbTopologyStar
---

Queues facilitate async operations to be performed.It could be used for performing background tasks such as sending a welcome email on register.
Each use case will have its own queue class extended from `MessageQueueServiceBase`.

Currently queue supports 2 drivers which can be configurred by env variable `MESSAGE_QUEUE_TYPE`

Check failure on line 11 in docs/docs/contributor/server/basics/queue.mdx

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [docs.Numbers] Numbers must be spelled out Raw Output: {"message": "[docs.Numbers] Numbers must be spelled out", "location": {"path": "docs/docs/contributor/server/basics/queue.mdx", "range": {"start": {"line": 11, "column": 1}}}, "severity": "ERROR"}
1. `pg-boss` this is the default driver, uses [pg-boss](https://github.com/timgit/pg-boss) under the hood.
2. `bull-mq` it uses [bull-mq](https://bullmq.io/) under the hood.

Steps to create and use a new queue
1. add a queue name for your new queue under enum `MESSAGE_QUEUES`.
2. provide factory implementation of the queue with queue name as dependency token.
3. inject the queue that you created in the required module/service with queue name as dependency token.
4. add worker class with token based injection just like producer.

### Example usage
```ts
class Resolver {
constructor(@Inject(MESSAGE_QUEUES.custom) private queue: MessageQueueService) {}

async onSomeAction() {
//business logic
await this.queue.add(someData);
}
}

//async worker
class CustomWorker {
constructor(@Inject(MESSAGE_QUEUES.custom) private queue: MessageQueueService) {
this.initWorker();
}

async initWorker() {
await this.queue.work(async ({ id, data }) => {
//worker logic
});
}
}

```

8 changes: 8 additions & 0 deletions docs/docs/start/self-hosting/enviroment-variables.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import OptionTable from '@site/src/theme/OptionTable'

<OptionTable options={[
['PG_DATABASE_URL', 'postgres://user:pw@localhost:5432/default?connection_limit=1', 'Database connection'],
['REDIS_HOST', '127.0.0.1', 'Redis connection host'],
['REDIS_PORT', '6379', 'Redis connection port'],
['FRONT_BASE_URL', 'http://localhost:3001', 'Url to the hosted frontend'],
['PORT', '3000', 'Port'],
]}></OptionTable>
Expand Down Expand Up @@ -61,6 +63,12 @@ import OptionTable from '@site/src/theme/OptionTable'
['STORAGE_LOCAL_PATH', '.local-storage', 'data path (local storage)'],
]}></OptionTable>

### Message Queue

<OptionTable options={[
['MESSAGE_QUEUE_TYPE', 'pg-boss', "Queue driver: 'pg-boss' or 'bull-mq'"],
]}></OptionTable>

### Logging

<OptionTable options={[
Expand Down
5 changes: 4 additions & 1 deletion server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ SIGN_IN_PREFILLED=true
# SUPPORT_FRONT_CHAT_ID=replace_me_with_front_chat_id
# LOGGER_DRIVER=console
# SENTRY_DSN=https://xxx@xxx.ingest.sentry.io/xxx
# LOG_LEVEL=error,warn
# LOG_LEVEL=error,warn
# MESSAGE_QUEUE_TYPE=pg-boss
# REDIS_HOST=127.0.0.1
# REDIS_PORT=6379
2 changes: 2 additions & 0 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"axios": "^1.4.0",
"bcrypt": "^5.1.1",
"body-parser": "^1.20.2",
"bullmq": "^4.14.0",
"bytes": "^3.1.2",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
Expand Down Expand Up @@ -89,6 +90,7 @@
"passport-local": "^1.0.0",
"patch-package": "^8.0.0",
"pg": "^8.11.3",
"pg-boss": "^9.0.3",
"postinstall-postinstall": "^2.1.0",
"rimraf": "^3.0.2",
"rxjs": "^7.2.0",
Expand Down
16 changes: 16 additions & 0 deletions server/src/integrations/environment/environment.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AwsRegion } from './interfaces/aws-region.interface';
import { StorageType } from './interfaces/storage.interface';
import { SupportDriver } from './interfaces/support.interface';
import { LoggerDriver } from './interfaces/logger.interface';
import { MessageQueueType } from './interfaces/message-queue.interface';
mahendraHegde marked this conversation as resolved.
Show resolved Hide resolved

@Injectable()
export class EnvironmentService {
Expand Down Expand Up @@ -37,6 +38,14 @@ export class EnvironmentService {
return this.configService.get<string>('PG_DATABASE_URL')!;
}

getRedisHost(): string {
return this.configService.get<string>('REDIS_HOST') ?? '127.0.0.1';
}

getRedisPort(): number {
return +(this.configService.get<string>('REDIS_PORT') ?? 6379);
}

getFrontBaseUrl(): string {
return this.configService.get<string>('FRONT_BASE_URL')!;
}
Expand Down Expand Up @@ -102,6 +111,13 @@ export class EnvironmentService {
);
}

getMessageQueueType(): MessageQueueType {
return (
this.configService.get<MessageQueueType>('MESSAGE_QUEUE_TYPE') ??
MessageQueueType.PgBoss
);
}

getStorageS3Region(): AwsRegion | undefined {
return this.configService.get<AwsRegion>('STORAGE_S3_REGION');
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum MessageQueueType {
PgBoss = 'pg-boss',
BullMQ = 'bull-mq',
}
47 changes: 47 additions & 0 deletions server/src/integrations/integrations.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { StorageType } from './environment/interfaces/storage.interface';
import { LoggerModule } from './logger/logger.module';
import { LoggerModuleOptions } from './logger/interfaces';
import { LoggerDriver } from './environment/interfaces/logger.interface';
import { MessageQueueModule } from './message-queue/message-queue.module';
import { MessageQueueModuleOptions } from './message-queue/interfaces';
import { MessageQueueType } from './environment/interfaces/message-queue.interface';

/**
* FileStorage Module factory
Expand Down Expand Up @@ -84,6 +87,46 @@ const loggerModuleFactory = async (
}
};

/**
* MessageQueue Module factory
* @param environment
* @returns MessageQueueModuleOptions
*/
const messageQueueModuleFactory = async (
environmentService: EnvironmentService,
): Promise<MessageQueueModuleOptions> => {
const type = environmentService.getMessageQueueType();

switch (type) {
case MessageQueueType.PgBoss: {
const connectionString = environmentService.getPGDatabaseUrl();
return {
type: MessageQueueType.PgBoss,
options: {
connectionString,
},
};
}
case MessageQueueType.BullMQ: {
const host = environmentService.getRedisHost();
const port = environmentService.getRedisPort();
return {
type: MessageQueueType.BullMQ,
options: {
connection: {
host,
port,
},
},
};
}
default:
throw new Error(
`Invalid message queue type (${type}), check your .env file`,
);
}
};

@Module({
imports: [
EnvironmentModule.forRoot({}),
Expand All @@ -95,6 +138,10 @@ const loggerModuleFactory = async (
useFactory: loggerModuleFactory,
inject: [EnvironmentService],
}),
MessageQueueModule.forRoot({
useFactory: messageQueueModuleFactory,
inject: [EnvironmentService],
}),
],
exports: [],
providers: [],
Expand Down
60 changes: 60 additions & 0 deletions server/src/integrations/message-queue/drivers/bullmq.driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Queue, QueueOptions, Worker } from 'bullmq';

import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';

import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants';

import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';

export type BullMQDriverOptions = QueueOptions;

export class BullMQDriver implements MessageQueueDriver {
private queueMap: Record<MessageQueues, Queue> = {} as Record<
MessageQueues,
Queue
>;
private workerMap: Record<MessageQueues, Worker> = {} as Record<
MessageQueues,
Worker
>;

constructor(private options: BullMQDriverOptions) {}

register(queueName: MessageQueues): void {
this.queueMap[queueName] = new Queue(queueName, this.options);
}

async stop() {
const workers = Object.values(this.workerMap);
const queues = Object.values(this.queueMap);
await Promise.all([
...queues.map((q) => q.close()),
...workers.map((w) => w.close()),
]);
}

async work<T>(
queueName: MessageQueues,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) {
const worker = new Worker(queueName, async (job) => {
await handler(job as { data: T; id: string });
});
this.workerMap[queueName] = worker;
}

async add<T>(
queueName: MessageQueues,
data: T,
options?: QueueJobOptions,
): Promise<void> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
);
}
await this.queueMap[queueName].add(options?.id || '', data, {
priority: options?.priority,
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface QueueJobOptions {
id?: string;
priority?: number;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';

import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants';

export interface MessageQueueDriver {
add<T>(
queueName: MessageQueues,
data: T,
options?: QueueJobOptions,
): Promise<void>;
work<T>(
queueName: string,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
);
stop?(): Promise<void>;
register?(queueName: MessageQueues): void;
}
37 changes: 37 additions & 0 deletions server/src/integrations/message-queue/drivers/pg-boss.driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import PgBoss from 'pg-boss';
mahendraHegde marked this conversation as resolved.
Show resolved Hide resolved

import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';

import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';

export type PgBossDriverOptions = PgBoss.ConstructorOptions;

export class PgBossDriver implements MessageQueueDriver {
private pgBoss: PgBoss;

constructor(options: PgBossDriverOptions) {
this.pgBoss = new PgBoss(options);
}
async stop() {
await this.pgBoss.stop();
}

async init(): Promise<void> {
await this.pgBoss.start();
}

async work<T>(
queueName: string,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) {
return this.pgBoss.work(queueName, handler);
}

async add<T>(
queueName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
await this.pgBoss.send(queueName, data as object, options ? options : {});
}
}
1 change: 1 addition & 0 deletions server/src/integrations/message-queue/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './message-queue.interface';
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { FactoryProvider, ModuleMetadata } from '@nestjs/common';

import { MessageQueueType } from 'src/integrations/environment/interfaces/message-queue.interface';

import { BullMQDriverOptions } from 'src/integrations/message-queue/drivers/bullmq.driver';
import { PgBossDriverOptions } from 'src/integrations/message-queue/drivers/pg-boss.driver';

export interface PgBossDriverFactoryOptions {
type: MessageQueueType.PgBoss;
options: PgBossDriverOptions;
}

export interface BullMQDriverFactoryOptions {
type: MessageQueueType.BullMQ;
options: BullMQDriverOptions;
}

export type MessageQueueModuleOptions =
| PgBossDriverFactoryOptions
| BullMQDriverFactoryOptions;

export type MessageQueueModuleAsyncOptions = {
useFactory: (
...args: any[]
) => MessageQueueModuleOptions | Promise<MessageQueueModuleOptions>;
} & Pick<ModuleMetadata, 'imports'> &
Pick<FactoryProvider, 'inject'>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER');

export enum MessageQueues {
taskAssignedQueue = 'task-assigned-queue',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be less specific but we can work on that later :)
We should have a notificationQueue, paymentQueue, ...

}
Loading
Loading