Skip to content

Commit

Permalink
add message queue integration
Browse files Browse the repository at this point in the history
  • Loading branch information
mahendraHegde committed Nov 14, 2023
1 parent f476129 commit a0a1028
Show file tree
Hide file tree
Showing 17 changed files with 389 additions and 34 deletions.
3 changes: 2 additions & 1 deletion server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ SIGN_IN_PREFILLED=true
# SUPPORT_FRONT_CHAT_ID=replace_me_with_front_chat_id
# LOGGER_DRIVER=console
# SENTRY_DSN=https://xxx@xxx.ingest.sentry.io/xxx
# LOG_LEVEL=error,warn
# LOG_LEVEL=error,warn
# MESSAGE_QUEUE_TYPE=pg-boss
3 changes: 2 additions & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"passport-local": "^1.0.0",
"patch-package": "^8.0.0",
"pg": "^8.11.3",
"pg-boss": "^9.0.3",
"postinstall-postinstall": "^2.1.0",
"prisma-graphql-type-decimal": "^3.0.0",
"reflect-metadata": "^0.1.13",
Expand Down Expand Up @@ -162,4 +163,4 @@
"schema": "src/database/schema.prisma",
"seed": "ts-node src/database/seeds/index.ts"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,71 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import { MigrationInterface, QueryRunner } from 'typeorm';

export class SetupMetadataTables1699890270187 implements MigrationInterface {
name = 'SetupMetadataTables1699890270187'
name = 'SetupMetadataTables1699890270187';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "metadata"."relationMetadata" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "relationType" character varying NOT NULL, "fromObjectMetadataId" uuid NOT NULL, "toObjectMetadataId" uuid NOT NULL, "fromFieldMetadataId" uuid NOT NULL, "toFieldMetadataId" uuid NOT NULL, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "REL_3deb257254145a3bdde9575e7d" UNIQUE ("fromFieldMetadataId"), CONSTRAINT "REL_9dea8f90d04edbbf9c541a95c3" UNIQUE ("toFieldMetadataId"), CONSTRAINT "PK_2724f60cb4f17a89481a7e8d7d3" PRIMARY KEY ("id"))`);
await queryRunner.query(`CREATE TYPE "metadata"."dataSource_type_enum" AS ENUM('postgres')`);
await queryRunner.query(`CREATE TABLE "metadata"."dataSource" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "url" character varying, "schema" character varying, "type" "metadata"."dataSource_type_enum" NOT NULL DEFAULT 'postgres', "label" character varying, "isRemote" boolean NOT NULL DEFAULT false, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_6d01ae6c0f47baf4f8e37342268" PRIMARY KEY ("id"))`);
await queryRunner.query(`CREATE TABLE "metadata"."objectMetadata" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "dataSourceId" uuid NOT NULL, "nameSingular" character varying NOT NULL, "namePlural" character varying NOT NULL, "labelSingular" character varying NOT NULL, "labelPlural" character varying NOT NULL, "description" text, "icon" character varying, "targetTableName" character varying NOT NULL, "isCustom" boolean NOT NULL DEFAULT false, "isActive" boolean NOT NULL DEFAULT false, "isSystem" boolean NOT NULL DEFAULT false, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "IndexOnNamePluralAndWorkspaceIdUnique" UNIQUE ("namePlural", "workspaceId"), CONSTRAINT "IndexOnNameSingularAndWorkspaceIdUnique" UNIQUE ("nameSingular", "workspaceId"), CONSTRAINT "PK_81fb7f4f4244211cfbd188af1e8" PRIMARY KEY ("id"))`);
await queryRunner.query(`CREATE TABLE "metadata"."fieldMetadata" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "objectMetadataId" uuid NOT NULL, "type" character varying NOT NULL, "name" character varying NOT NULL, "label" character varying NOT NULL, "targetColumnMap" jsonb NOT NULL, "description" text, "icon" character varying, "enums" text array, "isCustom" boolean NOT NULL DEFAULT false, "isActive" boolean NOT NULL DEFAULT false, "isNullable" boolean DEFAULT true, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "IndexOnNameObjectMetadataIdAndWorkspaceIdUnique" UNIQUE ("name", "objectMetadataId", "workspaceId"), CONSTRAINT "PK_d046b1c7cea325ebc4cdc25e7a9" PRIMARY KEY ("id"))`);
await queryRunner.query(`CREATE TABLE "metadata"."tenantMigration" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "migrations" jsonb, "name" character varying, "isCustom" boolean NOT NULL DEFAULT false, "appliedAt" TIMESTAMP, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_f9b06eb42494795f73acb5c2350" PRIMARY KEY ("id"))`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_f2a0acd3a548ee446a1a35df44d" FOREIGN KEY ("fromObjectMetadataId") REFERENCES "metadata"."objectMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_0f781f589e5a527b8f3d3a4b824" FOREIGN KEY ("toObjectMetadataId") REFERENCES "metadata"."objectMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_3deb257254145a3bdde9575e7d6" FOREIGN KEY ("fromFieldMetadataId") REFERENCES "metadata"."fieldMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_9dea8f90d04edbbf9c541a95c3b" FOREIGN KEY ("toFieldMetadataId") REFERENCES "metadata"."fieldMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "metadata"."objectMetadata" ADD CONSTRAINT "FK_0b19dd17369574578bc18c405b2" FOREIGN KEY ("dataSourceId") REFERENCES "metadata"."dataSource"("id") ON DELETE CASCADE ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "metadata"."fieldMetadata" ADD CONSTRAINT "FK_de2a09b9e3e690440480d2dee26" FOREIGN KEY ("objectMetadataId") REFERENCES "metadata"."objectMetadata"("id") ON DELETE CASCADE ON UPDATE NO ACTION`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "metadata"."fieldMetadata" DROP CONSTRAINT "FK_de2a09b9e3e690440480d2dee26"`);
await queryRunner.query(`ALTER TABLE "metadata"."objectMetadata" DROP CONSTRAINT "FK_0b19dd17369574578bc18c405b2"`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_9dea8f90d04edbbf9c541a95c3b"`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_3deb257254145a3bdde9575e7d6"`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_0f781f589e5a527b8f3d3a4b824"`);
await queryRunner.query(`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_f2a0acd3a548ee446a1a35df44d"`);
await queryRunner.query(`DROP TABLE "metadata"."tenantMigration"`);
await queryRunner.query(`DROP TABLE "metadata"."fieldMetadata"`);
await queryRunner.query(`DROP TABLE "metadata"."objectMetadata"`);
await queryRunner.query(`DROP TABLE "metadata"."dataSource"`);
await queryRunner.query(`DROP TYPE "metadata"."dataSource_type_enum"`);
await queryRunner.query(`DROP TABLE "metadata"."relationMetadata"`);
}
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE "metadata"."relationMetadata" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "relationType" character varying NOT NULL, "fromObjectMetadataId" uuid NOT NULL, "toObjectMetadataId" uuid NOT NULL, "fromFieldMetadataId" uuid NOT NULL, "toFieldMetadataId" uuid NOT NULL, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "REL_3deb257254145a3bdde9575e7d" UNIQUE ("fromFieldMetadataId"), CONSTRAINT "REL_9dea8f90d04edbbf9c541a95c3" UNIQUE ("toFieldMetadataId"), CONSTRAINT "PK_2724f60cb4f17a89481a7e8d7d3" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`CREATE TYPE "metadata"."dataSource_type_enum" AS ENUM('postgres')`,
);
await queryRunner.query(
`CREATE TABLE "metadata"."dataSource" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "url" character varying, "schema" character varying, "type" "metadata"."dataSource_type_enum" NOT NULL DEFAULT 'postgres', "label" character varying, "isRemote" boolean NOT NULL DEFAULT false, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_6d01ae6c0f47baf4f8e37342268" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`CREATE TABLE "metadata"."objectMetadata" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "dataSourceId" uuid NOT NULL, "nameSingular" character varying NOT NULL, "namePlural" character varying NOT NULL, "labelSingular" character varying NOT NULL, "labelPlural" character varying NOT NULL, "description" text, "icon" character varying, "targetTableName" character varying NOT NULL, "isCustom" boolean NOT NULL DEFAULT false, "isActive" boolean NOT NULL DEFAULT false, "isSystem" boolean NOT NULL DEFAULT false, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "IndexOnNamePluralAndWorkspaceIdUnique" UNIQUE ("namePlural", "workspaceId"), CONSTRAINT "IndexOnNameSingularAndWorkspaceIdUnique" UNIQUE ("nameSingular", "workspaceId"), CONSTRAINT "PK_81fb7f4f4244211cfbd188af1e8" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`CREATE TABLE "metadata"."fieldMetadata" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "objectMetadataId" uuid NOT NULL, "type" character varying NOT NULL, "name" character varying NOT NULL, "label" character varying NOT NULL, "targetColumnMap" jsonb NOT NULL, "description" text, "icon" character varying, "enums" text array, "isCustom" boolean NOT NULL DEFAULT false, "isActive" boolean NOT NULL DEFAULT false, "isNullable" boolean DEFAULT true, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "IndexOnNameObjectMetadataIdAndWorkspaceIdUnique" UNIQUE ("name", "objectMetadataId", "workspaceId"), CONSTRAINT "PK_d046b1c7cea325ebc4cdc25e7a9" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`CREATE TABLE "metadata"."tenantMigration" ("id" uuid NOT NULL DEFAULT uuid_generate_v4(), "migrations" jsonb, "name" character varying, "isCustom" boolean NOT NULL DEFAULT false, "appliedAt" TIMESTAMP, "workspaceId" character varying NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_f9b06eb42494795f73acb5c2350" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_f2a0acd3a548ee446a1a35df44d" FOREIGN KEY ("fromObjectMetadataId") REFERENCES "metadata"."objectMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_0f781f589e5a527b8f3d3a4b824" FOREIGN KEY ("toObjectMetadataId") REFERENCES "metadata"."objectMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_3deb257254145a3bdde9575e7d6" FOREIGN KEY ("fromFieldMetadataId") REFERENCES "metadata"."fieldMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" ADD CONSTRAINT "FK_9dea8f90d04edbbf9c541a95c3b" FOREIGN KEY ("toFieldMetadataId") REFERENCES "metadata"."fieldMetadata"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."objectMetadata" ADD CONSTRAINT "FK_0b19dd17369574578bc18c405b2" FOREIGN KEY ("dataSourceId") REFERENCES "metadata"."dataSource"("id") ON DELETE CASCADE ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."fieldMetadata" ADD CONSTRAINT "FK_de2a09b9e3e690440480d2dee26" FOREIGN KEY ("objectMetadataId") REFERENCES "metadata"."objectMetadata"("id") ON DELETE CASCADE ON UPDATE NO ACTION`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "metadata"."fieldMetadata" DROP CONSTRAINT "FK_de2a09b9e3e690440480d2dee26"`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."objectMetadata" DROP CONSTRAINT "FK_0b19dd17369574578bc18c405b2"`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_9dea8f90d04edbbf9c541a95c3b"`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_3deb257254145a3bdde9575e7d6"`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_0f781f589e5a527b8f3d3a4b824"`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."relationMetadata" DROP CONSTRAINT "FK_f2a0acd3a548ee446a1a35df44d"`,
);
await queryRunner.query(`DROP TABLE "metadata"."tenantMigration"`);
await queryRunner.query(`DROP TABLE "metadata"."fieldMetadata"`);
await queryRunner.query(`DROP TABLE "metadata"."objectMetadata"`);
await queryRunner.query(`DROP TABLE "metadata"."dataSource"`);
await queryRunner.query(`DROP TYPE "metadata"."dataSource_type_enum"`);
await queryRunner.query(`DROP TABLE "metadata"."relationMetadata"`);
}
}
8 changes: 8 additions & 0 deletions server/src/integrations/environment/environment.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AwsRegion } from './interfaces/aws-region.interface';
import { StorageType } from './interfaces/storage.interface';
import { SupportDriver } from './interfaces/support.interface';
import { LoggerDriver } from './interfaces/logger.interface';
import { MessageQueueType } from './interfaces/message-queue.interface';

@Injectable()
export class EnvironmentService {
Expand Down Expand Up @@ -102,6 +103,13 @@ export class EnvironmentService {
);
}

getMessageQueueType(): MessageQueueType {
return (
this.configService.get<MessageQueueType>('MESSAGE_QUEUE_TYPE') ??
MessageQueueType.Local
);
}

getStorageS3Region(): AwsRegion | undefined {
return this.configService.get<AwsRegion>('STORAGE_S3_REGION');
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum MessageQueueType {
PgBoss = 'pg-boss',
Local = 'local',
}
40 changes: 40 additions & 0 deletions server/src/integrations/integrations.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { StorageType } from './environment/interfaces/storage.interface';
import { LoggerModule } from './logger/logger.module';
import { LoggerModuleOptions } from './logger/interfaces';
import { LoggerDriver } from './environment/interfaces/logger.interface';
import { MessageQueueModule } from './message-queue/message-queue.module';
import { MessageQueueModuleOptions } from './message-queue/interfaces';
import { MessageQueueType } from './environment/interfaces/message-queue.interface';

/**
* FileStorage Module factory
Expand Down Expand Up @@ -84,6 +87,39 @@ const loggerModuleFactory = async (
}
};

/**
* MessageQueue Module factory
* @param environment
* @returns MessageQueueModuleOptions
*/
const messageQueueModuleFactory = async (
environmentService: EnvironmentService,
): Promise<MessageQueueModuleOptions> => {
const type = environmentService.getMessageQueueType();

switch (type) {
case MessageQueueType.Local: {
return {
type: MessageQueueType.Local,
options: {},
};
}
case MessageQueueType.PgBoss: {
const connectionString = environmentService.getPGDatabaseUrl();
return {
type: MessageQueueType.PgBoss,
options: {
connectionString,
},
};
}
default:
throw new Error(
`Invalid message queue type (${type}), check your .env file`,
);
}
};

@Module({
imports: [
EnvironmentModule.forRoot({}),
Expand All @@ -95,6 +131,10 @@ const loggerModuleFactory = async (
useFactory: loggerModuleFactory,
inject: [EnvironmentService],
}),
MessageQueueModule.forRoot({
useFactory: messageQueueModuleFactory,
inject: [EnvironmentService],
}),
],
exports: [],
providers: [],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface MessageQueueDriver {
publish<T>(topic: string, data: T): Promise<void>;
subscribe<T>(
topic: string,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
);
}
22 changes: 22 additions & 0 deletions server/src/integrations/message-queue/drivers/local.driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { EventEmitter } from 'stream';

import { v4 } from 'uuid';

import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';

export class LocalDriver extends EventEmitter implements MessageQueueDriver {
constructor() {
super();
}

async subscribe<T>(
topic: string,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) {
this.on(topic, handler);
}

async publish<T>(topic: string, data: T): Promise<void> {
this.emit(topic, { data, id: v4() });
}
}
27 changes: 27 additions & 0 deletions server/src/integrations/message-queue/drivers/pg-boss.driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import PgBoss from 'pg-boss';

import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';

export type PgBossDriverOptions = PgBoss.ConstructorOptions;

export class PgBossDriver implements MessageQueueDriver {
private pgBoss: PgBoss;

constructor(options: PgBossDriverOptions) {
this.pgBoss = new PgBoss(options);
}

async init(): Promise<void> {
await this.pgBoss.start();
}
async subscribe<T>(
topic: string,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) {
this.pgBoss.work(topic, handler);
}

async publish<T>(topic: string, data: T): Promise<void> {
await this.pgBoss.send(topic, data as object);
}
}
1 change: 1 addition & 0 deletions server/src/integrations/message-queue/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './message-queue.interface';
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { FactoryProvider, ModuleMetadata } from '@nestjs/common';

import { MessageQueueType } from 'src/integrations/environment/interfaces/message-queue.interface';

import { PgBossDriverOptions } from 'src/integrations/message-queue/drivers/pg-boss.driver';

export interface PgBossDriverFactoryOptions {
type: MessageQueueType.PgBoss;
options: PgBossDriverOptions;
}

export interface LocalDriverFactoryOptions {
type: MessageQueueType.Local;
options: object;
}

export type MessageQueueModuleOptions =
| PgBossDriverFactoryOptions
| LocalDriverFactoryOptions;

export type MessageQueueModuleAsyncOptions = {
useFactory: (
...args: any[]
) => MessageQueueModuleOptions | Promise<MessageQueueModuleOptions>;
} & Pick<ModuleMetadata, 'imports'> &
Pick<FactoryProvider, 'inject'>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER');

export enum QUEUE_TOPICS {
taskAssignedQueueTopic = 'task-assigned-topic',
}
Loading

0 comments on commit a0a1028

Please sign in to comment.