diff --git a/backend/src/bin/jobs/cleanUp.ts b/backend/src/bin/jobs/cleanUp.ts index 0c2fdfa08d..cac6cf2021 100644 --- a/backend/src/bin/jobs/cleanUp.ts +++ b/backend/src/bin/jobs/cleanUp.ts @@ -3,6 +3,7 @@ import IncomingWebhookRepository from '../../database/repositories/incomingWebho import IntegrationRunRepository from '../../database/repositories/integrationRunRepository' import SequelizeRepository from '../../database/repositories/sequelizeRepository' import { CrowdJob } from '../../types/jobTypes' +import AuditLogRepository from '../../database/repositories/auditLogRepository' const MAX_MONTHS_TO_KEEP = 3 @@ -38,6 +39,13 @@ export const cleanUpOldWebhooks = async () => { await repo.cleanUpOldWebhooks(MAX_MONTHS_TO_KEEP) } +export const cleanUpOldAuditLogs = async () => { + const dbOptions = await SequelizeRepository.getDefaultIRepositoryOptions() + + log.info(`Cleaning up audit logs that are older than 1 month!`) + await AuditLogRepository.cleanUpOldAuditLogs(1, dbOptions) +} + const job: CrowdJob = { name: 'Clean up old data', // run once every week on Sunday at 1AM diff --git a/backend/src/database/models/index.ts b/backend/src/database/models/index.ts index 963ae3d5bb..4197e011e3 100644 --- a/backend/src/database/models/index.ts +++ b/backend/src/database/models/index.ts @@ -67,9 +67,9 @@ function models() { write: { host: DB_CONFIG.writeHost }, }, pool: { - max: SERVICE === configTypes.ServiceType.API ? 100 : 10, + max: SERVICE === configTypes.ServiceType.API ? 20 : 10, min: 0, - acquire: 30000, + acquire: 50000, idle: 10000, }, logging: DB_CONFIG.logging diff --git a/backend/src/database/repositories/auditLogRepository.ts b/backend/src/database/repositories/auditLogRepository.ts index 28f25d3e26..6d669d2256 100644 --- a/backend/src/database/repositories/auditLogRepository.ts +++ b/backend/src/database/repositories/auditLogRepository.ts @@ -1,4 +1,4 @@ -import Sequelize from 'sequelize' +import Sequelize, { QueryTypes } from 'sequelize' import SequelizeRepository from './sequelizeRepository' import SequelizeFilterUtils from '../utils/sequelizeFilterUtils' import { IRepositoryOptions } from './IRepositoryOptions' @@ -54,6 +54,22 @@ export default class AuditLogRepository { return log } + static async cleanUpOldAuditLogs( + maxMonthsToKeep: number, + options: IRepositoryOptions, + ): Promise { + const seq = SequelizeRepository.getSequelize(options) + + await seq.query( + ` + delete from "auditLogs" where timestamp < now() - interval '${maxMonthsToKeep} months' + `, + { + type: QueryTypes.DELETE, + }, + ) + } + static async findAndCountAll( { filter, limit = 0, offset = 0, orderBy = '' }, options: IRepositoryOptions, diff --git a/services/apps/data_sink_worker/src/main.ts b/services/apps/data_sink_worker/src/main.ts index d3c978daca..d549f9ff0e 100644 --- a/services/apps/data_sink_worker/src/main.ts +++ b/services/apps/data_sink_worker/src/main.ts @@ -7,12 +7,14 @@ import { initializeSentimentAnalysis } from '@crowd/sentiment' const log = getServiceLogger() +const MAX_CONCURRENT_PROCESSING = 2 + setImmediate(async () => { log.info('Starting data sink worker...') const sqsClient = getSqsClient(SQS_CONFIG()) - const dbConnection = getDbConnection(DB_CONFIG()) + const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING) if (SENTIMENT_CONFIG()) { initializeSentimentAnalysis(SENTIMENT_CONFIG()) @@ -20,7 +22,13 @@ setImmediate(async () => { const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log) - const queue = new WorkerQueueReceiver(sqsClient, dbConnection, nodejsWorkerEmitter, log) + const queue = new WorkerQueueReceiver( + sqsClient, + dbConnection, + nodejsWorkerEmitter, + log, + MAX_CONCURRENT_PROCESSING, + ) try { await nodejsWorkerEmitter.init() diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 2c4546ff58..505e1550b6 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -19,8 +19,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { private readonly dbConn: DbConnection, private readonly nodejsWorkerEmitter: NodejsWorkerEmitter, parentLog: Logger, + maxConcurrentProcessing: number, ) { - super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, 2, parentLog) + super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog) } override async processMessage(message: IQueueMessage): Promise { diff --git a/services/apps/integration_data_worker/src/main.ts b/services/apps/integration_data_worker/src/main.ts index 9ad4fb1637..59af307991 100644 --- a/services/apps/integration_data_worker/src/main.ts +++ b/services/apps/integration_data_worker/src/main.ts @@ -7,12 +7,14 @@ import { WorkerQueueReceiver } from './queue' const log = getServiceLogger() +const MAX_CONCURRENT_PROCESSING = 2 + setImmediate(async () => { log.info('Starting integration data worker...') const sqsClient = getSqsClient(SQS_CONFIG()) - const dbConnection = getDbConnection(DB_CONFIG()) + const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING) const redisClient = await getRedisClient(REDIS_CONFIG(), true) const streamWorkerEmitter = new IntegrationStreamWorkerEmitter(sqsClient, log) @@ -25,6 +27,7 @@ setImmediate(async () => { streamWorkerEmitter, dataSinkWorkerEmitter, log, + MAX_CONCURRENT_PROCESSING, ) try { diff --git a/services/apps/integration_data_worker/src/queue/index.ts b/services/apps/integration_data_worker/src/queue/index.ts index b20fef7261..3b9e0fb4b8 100644 --- a/services/apps/integration_data_worker/src/queue/index.ts +++ b/services/apps/integration_data_worker/src/queue/index.ts @@ -23,8 +23,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { private readonly streamWorkerEmitter: IntegrationStreamWorkerEmitter, private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter, parentLog: Logger, + maxConcurrentProcessing: number, ) { - super(client, INTEGRATION_DATA_WORKER_QUEUE_SETTINGS, 2, parentLog) + super(client, INTEGRATION_DATA_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog) } override async processMessage(message: IQueueMessage): Promise { diff --git a/services/apps/integration_run_worker/src/main.ts b/services/apps/integration_run_worker/src/main.ts index 58c2ac17ed..e92d1fbba6 100644 --- a/services/apps/integration_run_worker/src/main.ts +++ b/services/apps/integration_run_worker/src/main.ts @@ -11,12 +11,14 @@ import { ApiPubSubEmitter, getRedisClient } from '@crowd/redis' const log = getServiceLogger() +const MAX_CONCURRENT_PROCESSING = 2 + setImmediate(async () => { log.info('Starting integration run worker...') const sqsClient = getSqsClient(SQS_CONFIG()) - const dbConnection = getDbConnection(DB_CONFIG()) + const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING) const redisClient = await getRedisClient(REDIS_CONFIG(), true) const runWorkerEmitter = new IntegrationRunWorkerEmitter(sqsClient, log) @@ -32,6 +34,7 @@ setImmediate(async () => { runWorkerEmitter, apiPubSubEmitter, log, + MAX_CONCURRENT_PROCESSING, ) try { diff --git a/services/apps/integration_run_worker/src/queue/index.ts b/services/apps/integration_run_worker/src/queue/index.ts index 2c895a0951..94984023bd 100644 --- a/services/apps/integration_run_worker/src/queue/index.ts +++ b/services/apps/integration_run_worker/src/queue/index.ts @@ -28,8 +28,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { private readonly runWorkerEmitter: IntegrationRunWorkerEmitter, private readonly apiPubSubEmitter: ApiPubSubEmitter, parentLog: Logger, + maxConcurrentProcessing: number, ) { - super(client, INTEGRATION_RUN_WORKER_QUEUE_SETTINGS, 2, parentLog) + super(client, INTEGRATION_RUN_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog) } override async processMessage(message: IQueueMessage): Promise { diff --git a/services/apps/integration_stream_worker/src/main.ts b/services/apps/integration_stream_worker/src/main.ts index 3e1641a8fd..73be31b2a4 100644 --- a/services/apps/integration_stream_worker/src/main.ts +++ b/services/apps/integration_stream_worker/src/main.ts @@ -12,12 +12,14 @@ import { WorkerQueueReceiver } from './queue' const log = getServiceLogger() +const MAX_CONCURRENT_PROCESSING = 2 + setImmediate(async () => { log.info('Starting integration stream worker...') const sqsClient = getSqsClient(SQS_CONFIG()) - const dbConnection = getDbConnection(DB_CONFIG()) + const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING) const redisClient = await getRedisClient(REDIS_CONFIG(), true) const runWorkerEmiiter = new IntegrationRunWorkerEmitter(sqsClient, log) @@ -32,6 +34,7 @@ setImmediate(async () => { dataWorkerEmitter, streamWorkerEmitter, log, + MAX_CONCURRENT_PROCESSING, ) try { diff --git a/services/apps/integration_stream_worker/src/queue/index.ts b/services/apps/integration_stream_worker/src/queue/index.ts index 577d23c2f2..491f08cf80 100644 --- a/services/apps/integration_stream_worker/src/queue/index.ts +++ b/services/apps/integration_stream_worker/src/queue/index.ts @@ -26,8 +26,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { private readonly dataWorkerEmitter: IntegrationDataWorkerEmitter, private readonly streamWorkerEmitter: IntegrationStreamWorkerEmitter, parentLog: Logger, + maxConcurrentProcessing: number, ) { - super(client, INTEGRATION_STREAM_WORKER_QUEUE_SETTINGS, 2, parentLog) + super(client, INTEGRATION_STREAM_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog) } override async processMessage(message: IQueueMessage): Promise { diff --git a/services/libs/database/src/connection.ts b/services/libs/database/src/connection.ts index ac4e99b24b..79d9db7cfb 100644 --- a/services/libs/database/src/connection.ts +++ b/services/libs/database/src/connection.ts @@ -44,7 +44,7 @@ export const getDbInstance = (): DbInstance => { let dbConnection: DbConnection | undefined -export const getDbConnection = (config: IDatabaseConfig): DbConnection => { +export const getDbConnection = (config: IDatabaseConfig, maxPoolSize?: number): DbConnection => { if (dbConnection) { return dbConnection } @@ -56,7 +56,10 @@ export const getDbConnection = (config: IDatabaseConfig): DbConnection => { const dbInstance = getDbInstance() - dbConnection = dbInstance(config) + dbConnection = dbInstance({ + ...config, + max: maxPoolSize || 5, + }) return dbConnection }