Skip to content

Commit

Permalink
✨ Refactor syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
naelob committed Jun 26, 2024
1 parent 8f81e9b commit 7c5c29f
Show file tree
Hide file tree
Showing 702 changed files with 2,960 additions and 4,546 deletions.
6 changes: 1 addition & 5 deletions INTEGRATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,7 @@ Don't forget to add your service you've defined at step 1 inside the module unde

```ts
@Module({
imports: [
BullModule.registerQueue({
name: 'webhookDelivery',
}),
],

controllers: [ContactController],
providers: [
ContactService,
Expand Down
13 changes: 4 additions & 9 deletions packages/api/scripts/commonObject.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import { LoggerService } from '@@core/logger/logger.service';
import { v4 as uuidv4 } from 'uuid';
import { ApiResponse } from '@@core/utils/types';
import { throwTypedError, Unified${VerticalCap}Error } from '@@core/utils/errors';
import { WebhookService } from '@@core/webhook/webhook.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';
import { Unified${ObjectCap}Input, Unified${ObjectCap}Output } from '../types/model.unified';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
Expand Down Expand Up @@ -131,9 +131,8 @@ import { ApiResponse } from '@@core/utils/types';
import { v4 as uuidv4 } from 'uuid';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { ServiceRegistry } from '../services/registry.service';
import { ${VerticalCap}Object } from '@${VerticalLow}/@utils/@types';
import { WebhookService } from '@@core/webhook/webhook.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';
import { Unified${ObjectCap}Output } from '../types/model.unified';
import { I${ObjectCap}Service } from '../types';
Expand Down Expand Up @@ -231,17 +230,13 @@ import { ServiceRegistry } from './services/registry.service';
import { EncryptionService } from '@@core/encryption/encryption.service';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { PrismaService } from '@@core/prisma/prisma.service';
import { WebhookService } from '@@core/webhook/webhook.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';
import { BullModule } from '@nestjs/bull';
import { ConnectionUtils } from '@@core/connections/@utils';
import { ApiKeyAuthGuard } from '@@core/auth/guards/api-key.guard';
@Module({
imports: [
BullModule.registerQueue({
name: 'webhookDelivery',
}),
],
controllers: [${ObjectCap}Controller],
providers: [
${ObjectCap}Service,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { EnvironmentService } from '@@core/environment/environment.service';
import { LoggerService } from '@@core/logger/logger.service';
import { EncryptionError, throwTypedError } from '@@core/utils/errors';
import { EnvironmentService } from '@@core/@core-services/environment/environment.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { Injectable } from '@nestjs/common';
import * as crypto from 'crypto';

Expand Down
23 changes: 23 additions & 0 deletions packages/api/src/@core/@core-services/queues/queue.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { BullQueueService } from './shared.service';
import { Queues } from './types';

@Module({
imports: [
BullModule.registerQueue(
{
name: Queues.REMOTE_REAL_TIME_WEBHOOKS_RECEIVER,
},
{
name: Queues.PANORA_WEBHOOKS_SENDER,
},
{
name: Queues.SYNC_JOBS_WORKER,
},
),
],
providers: [BullQueueService],
exports: [BullQueueService],
})
export class BullQueueModule {}
44 changes: 44 additions & 0 deletions packages/api/src/@core/@core-services/queues/shared.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { Queues } from './types';

@Injectable()
export class BullQueueService {
constructor(
@InjectQueue(Queues.REMOTE_REAL_TIME_WEBHOOKS_RECEIVER)
public readonly realTimeWebhookQueue: Queue,
@InjectQueue(Queues.PANORA_WEBHOOKS_SENDER)
public readonly panoraWebhookDeliveryQueue: Queue,
@InjectQueue(Queues.SYNC_JOBS_WORKER)
public readonly syncJobsQueue: Queue,
) {}

// getters

getRealtimeWebhookReceiver() {
return this.realTimeWebhookQueue;
}
getPanoraWebhookSender() {
return this.panoraWebhookDeliveryQueue;
}

// setters
async queueSyncJob(jobName: string, cron: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
for (const job of jobs) {
if (job.name === jobName) {
await this.syncJobsQueue.removeRepeatableByKey(job.key);
}
}
// Add new job with the job name
await this.syncJobsQueue.add(
jobName,
{},
{
repeat: { cron },
jobId: jobName, // Using jobId to identify repeatable jobs
},
);
}
}
5 changes: 5 additions & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export enum Queues {
REMOTE_REAL_TIME_WEBHOOKS_RECEIVER = 'REMOTE_REAL_TIME_WEBHOOKS_RECEIVER', // Queue receives real time webhooks coming from remote 3rd parties
PANORA_WEBHOOKS_SENDER = 'PANORA_WEBHOOKS_SENDER', // Queue sends Panora webhooks to clients listening for important events
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Injectable } from '@nestjs/common';
import { IConnectionCategory } from '@@core/connections/@utils/types';

@Injectable()
export class CategoryConnectionRegistry<T = IConnectionCategory> {
private serviceMap: Map<string, T>;

constructor() {
this.serviceMap = new Map<string, T>();
}

registerService(serviceKey: string, service: T) {
this.serviceMap.set(serviceKey, service);
}

getService(category: string): T {
const service = this.serviceMap.get(category);
if (!service) {
throw new Error();
}
return service;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { IUnification } from '../types/interface';
import { IUnification } from '../../utils/types/interface';

@Injectable()
export class UnificationRegistry<T extends IUnification> {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
import { AccountingObject } from '@accounting/@lib/@types';
import { AtsObject } from '@ats/@lib/@types';
import { CrmObject } from '@crm/@lib/@types';
import { TargetObject, Unified, UnifyReturnType } from '../types';
import { TicketingObject } from '@ticketing/@lib/@types';
import { UnifySourceType } from '../types/unify.output';
import { ConnectorCategory } from '@panora/shared';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { MarketingAutomationObject } from '@marketingautomation/@lib/@types';
import { HrisObject } from '@hris/@lib/@types';
import { AccountingObject } from '@accounting/@lib/@types';
import { AtsObject } from '@ats/@lib/@types';
import { MarketingAutomationObject } from '@marketingautomation/@lib/@types';
import { Injectable } from '@nestjs/common';
import { UnificationRegistry } from '../registry/unification.registry';
import { CrmUnificationService } from '@crm/@lib/@unification';
import { AtsUnificationService } from '@ats/@lib/@unification';
import { DesunifyReturnType } from '../types/desunify.input';
import { AccountingUnificationService } from '@accounting/@lib/@unification';
import { FileStorageUnificationService } from '@filestorage/@lib/@unification';
import { HrisUnificationService } from '@hris/@lib/@unification';
import { MarketingAutomationUnificationService } from '@marketingautomation/@lib/@unification';
import { TicketingUnificationService } from '@ticketing/@lib/@unification';
import { ConnectorCategory } from '@panora/shared';
import { TicketingObject } from '@ticketing/@lib/@types';
import { TargetObject, Unified, UnifyReturnType } from '../../utils/types';
import { DesunifyReturnType } from '../../utils/types/desunify.input';
import { UnifySourceType } from '../../utils/types/unify.output';
import { UnificationRegistry } from '../registries/unification.registry';

@Injectable()
export class CoreUnification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
Request,
Delete,
} from '@nestjs/common';
import { LoggerService } from '@@core/logger/logger.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { ApiBody, ApiResponse, ApiTags, ApiOperation } from '@nestjs/swagger';
import { WebhookService } from './webhook.service';
import { SignatureVerificationDto, WebhookDto } from './dto/webhook.dto';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { WebhookService } from './webhook.service';
import { PrismaService } from '@@core/prisma/prisma.service';
import { LoggerService } from '@@core/logger/logger.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { WebhookProcessor } from './webhook.processor';
import { WebhookController } from './webhook.controller';
import { ValidateUserService } from '@@core/utils/services/validateUser.service';
import { BullQueueModule } from '@@core/@core-services/queues/queue.module';
import { ValidateUserService } from '@@core/utils/services/validate-user.service';

@Module({
imports: [
BullModule.registerQueue({
name: 'webhookDelivery',
}),
],
imports: [BullQueueModule],
controllers: [WebhookController],
exports: [
BullModule.registerQueue({
name: 'webhookDelivery',
}),
],
providers: [
WebhookService,

LoggerService,
WebhookProcessor,
ValidateUserService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { LoggerService } from '@@core/logger/logger.service';
import { PrismaService } from '@@core/prisma/prisma.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { OnQueueActive, Process, Processor } from '@nestjs/bull';
import axios from 'axios';
import { Job } from 'bull';
import { v4 as uuidv4 } from 'uuid';
import { WebhookService } from './webhook.service';
import { Queues } from '@@core/@core-services/queues/types';

@Processor('webhookDelivery')
@Processor(Queues.PANORA_WEBHOOKS_SENDER)
export class WebhookProcessor {
constructor(
private logger: LoggerService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { PrismaService } from '@@core/prisma/prisma.service';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { v4 as uuidv4 } from 'uuid';
import { LoggerService } from '@@core/logger/logger.service';
import { throwTypedError, WebhooksError } from '@@core/utils/errors';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { WebhooksError } from '@@core/utils/errors';
import { WebhookDto } from './dto/webhook.dto';
import axios from 'axios';
import crypto from 'crypto';
import { BullQueueService } from '@@core/@core-services/queues/shared.service';

@Injectable()
export class WebhookService {
constructor(
@InjectQueue('webhookDelivery') private queue: Queue,
private readonly queues: BullQueueService,
private prisma: PrismaService,
private logger: LoggerService,
) {
Expand Down Expand Up @@ -137,7 +136,7 @@ export class WebhookService {
});
this.logger.log('adding webhook to the queue ');
// we send the delivery webhook to the queue so it can be processed by our dispatcher worker
const job = await this.queue.add({
const job = await this.queues.getPanoraWebhookSender().add({
webhook_delivery_id: w_delivery.id_webhook_delivery_attempt,
});
} catch (error) {
Expand Down Expand Up @@ -271,7 +270,7 @@ export class WebhookService {

async handleFailedWebhook(failed_id_delivery_webhook: string) {
try {
await this.queue.add(
await this.queues.getPanoraWebhookSender().add(
{
webhook_delivery_id: failed_id_delivery_webhook,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { Body, Controller, Post, Param, Headers } from '@nestjs/common';
import { LoggerService } from '@@core/logger/logger.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { ApiResponse, ApiTags, ApiOperation } from '@nestjs/swagger';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { BullQueueService } from '@@core/@core-services/queues/shared.service';

@ApiTags('mw')
@Controller('mw')
export class MWHandlerController {
constructor(
@InjectQueue('realTimeWebhookQueue') private queue: Queue,
private readonly queues: BullQueueService,
private loggerService: LoggerService,
) {
this.loggerService.setContext(MWHandlerController.name);
Expand All @@ -28,6 +27,6 @@ export class MWHandlerController {
this.loggerService.log(
'Realtime Webhook Received with Payload ---- ' + JSON.stringify(data),
);
await this.queue.add({ uuid, data, headers });
await this.queues.getRealtimeWebhookReceiver().add({ uuid, data, headers });
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { LoggerService } from '@@core/logger/logger.service';
import { PrismaService } from '@@core/prisma/prisma.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { Queues } from '@@core/@core-services/queues/types';
import { CrmWebhookHandlerService } from '@crm/@webhook/handler.service';
import { OnQueueActive, Process, Processor } from '@nestjs/bull';
import { ConnectorCategory } from '@panora/shared';
import { TicketingWebhookHandlerService } from '@ticketing/@webhook/handler.service';
import { Job } from 'bull';

@Processor('realTimeWebhookQueue')
@Processor(Queues.REMOTE_REAL_TIME_WEBHOOKS_RECEIVER)
export class MwHandlerProcessor {
constructor(
private logger: LoggerService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { JwtAuthGuard } from '@@core/auth/guards/jwt-auth.guard';
import {
Body,
Controller,
Get,
Param,
Post,
Put,
Param,
UseGuards,
} from '@nestjs/common';
import { LoggerService } from '@@core/logger/logger.service';
import { ApiBody, ApiResponse, ApiTags, ApiOperation } from '@nestjs/swagger';
import { ManagedWebhooksService } from './managed-webhooks.service';
import { ApiBody, ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger';
import {
ManagedWebhooksDto,
RemoteThirdPartyCreationDto,
} from './dto/managed-webhooks.dto';
import { JwtAuthGuard } from '@@core/auth/guards/jwt-auth.guard';
import { ManagedWebhooksService } from './managed-webhooks.service';

@ApiTags('managed-webhooks')
@Controller('managed-webhooks')
export class ManagedWebhooksController {
Expand Down
Loading

0 comments on commit 7c5c29f

Please sign in to comment.