From 7bf246a80c9e1beaab1df3259f9576399ce4fbdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 27 Mar 2023 07:42:21 +0200 Subject: [PATCH 1/5] integration streams --- .../U1679825091__stream-delaying.sql | 2 + .../V1679825091__stream-delaying.sql | 37 +++ .../integrationStreamRepository.ts | 279 ++++++++++++++++++ backend/src/types/integrationStreamTypes.ts | 27 ++ 4 files changed, 345 insertions(+) create mode 100644 backend/src/database/migrations/U1679825091__stream-delaying.sql create mode 100644 backend/src/database/migrations/V1679825091__stream-delaying.sql create mode 100644 backend/src/database/repositories/integrationStreamRepository.ts create mode 100644 backend/src/types/integrationStreamTypes.ts diff --git a/backend/src/database/migrations/U1679825091__stream-delaying.sql b/backend/src/database/migrations/U1679825091__stream-delaying.sql new file mode 100644 index 0000000000..282e3af1b0 --- /dev/null +++ b/backend/src/database/migrations/U1679825091__stream-delaying.sql @@ -0,0 +1,2 @@ +drop table "integrationStreams"; +drop table "integrationRuns"; \ No newline at end of file diff --git a/backend/src/database/migrations/V1679825091__stream-delaying.sql b/backend/src/database/migrations/V1679825091__stream-delaying.sql new file mode 100644 index 0000000000..aad86b0c74 --- /dev/null +++ b/backend/src/database/migrations/V1679825091__stream-delaying.sql @@ -0,0 +1,37 @@ +create table "integrationRuns" ( + id uuid not null, + "integrationId" uuid not null, + + onboarding boolean not null, + state varchar(255) not null, + + "processedAt" timestamptz null, + "streamCount" int null, + error json null, + + "createdAt" timestamptz not null default now(), + + foreign key ("integrationId") references integrations (id), + primary key (id) +); + +create table "integrationStreams" ( + id uuid not null, + "runId" uuid not null, + "integrationId" uuid not null, + + state varchar(255) not null, + + name text not null, + metadata json not null, + + "processedAt" timestamptz null, + error json null, + + "createdAt" timestamptz not null default now(), + "updatedAt" timestamptz not null default now(), + + foreign key ("runId") references "integrationRuns" (id), + foreign key ("integrationId") references integrations (id), + primary key (id) +); \ No newline at end of file diff --git a/backend/src/database/repositories/integrationStreamRepository.ts b/backend/src/database/repositories/integrationStreamRepository.ts new file mode 100644 index 0000000000..f2e4a984de --- /dev/null +++ b/backend/src/database/repositories/integrationStreamRepository.ts @@ -0,0 +1,279 @@ +import { QueryTypes } from 'sequelize' +import { v1 as uuidV1 } from 'uuid' +import lodash from 'lodash' +import { + DbIntegrationStreamCreateData, + IntegrationStream, + IntegrationStreamState, +} from '../../types/integrationStreamTypes' +import { IRepositoryOptions } from './IRepositoryOptions' +import { RepositoryBase } from './repositoryBase' + +export default class IntegrationStreamRepository extends RepositoryBase< + IntegrationStream, + string, + DbIntegrationStreamCreateData, + unknown, + unknown +> { + public constructor(options: IRepositoryOptions) { + super(options, true) + } + + override async findById(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + select id, + "runId", + "integrationId", + state, + name, + metadata, + "processedAt", + error, + "createdAt" + from "integrationStreams" where id = :id; + ` + + const result = await seq.query(query, { + replacements: { + id, + }, + type: QueryTypes.SELECT, + transaction, + }) + + if (result.length !== 1) { + throw new Error(`Expected 1 row to be selected, got ${result.length} rows instead.`) + } + + return result[0] as IntegrationStream + } + + async findByRunId(runId: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + select id, + "runId", + "integrationId", + state, + name, + metadata, + "processedAt", + error, + "createdAt" + from "integrationStreams" where "runId" = :runId; + ` + + const result = await seq.query(query, { + replacements: { + runId, + }, + type: QueryTypes.SELECT, + transaction, + }) + + return result as IntegrationStream[] + } + + async bulkCreate(data: DbIntegrationStreamCreateData[]): Promise { + const transaction = this.transaction + + const seq = this.seq + + const batches = lodash.chunk(data, 999) + + const results: IntegrationStream[] = [] + + const query = ` + insert into "integrationStreams"(id, "runId", "integrationId", state, name, metadata) + values + ` + + for (const batch of batches) { + let i = 0 + const values: string[] = [] + const replacements: any = {} + + for (const item of batch) { + const id = uuidV1() + values.push( + `(:id${i}, :runId${i}, :integrationId${i}, :state${i}, :name${i}, :metadata${i})`, + ) + replacements[`id${i}`] = id + replacements[`runId${i}`] = item.runId + replacements[`state${i}`] = IntegrationStreamState.PENDING + replacements[`integrationId${i}`] = item.integrationId + replacements[`name${i}`] = item.name + replacements[`metadata${i}`] = item.metadata + i++ + } + + const finalQuery = `${query} ${values.join(', ')} returning "createdAt";` + + const batchResults = await seq.query(finalQuery, { + replacements, + type: QueryTypes.SELECT, + transaction, + }) + + if (batchResults.length !== batch.length) { + throw new Error( + `Expected ${batch.length} rows to be inserted, got ${batchResults.length} rows instead.`, + ) + } + + for (let j = 0; j < batch.length; j++) { + const item = batch[j] + const createdAt = (batchResults[j] as any).createdAt + results.push({ + id: replacements[`id${j}`], + runId: item.runId, + state: IntegrationStreamState.PENDING, + integrationId: item.integrationId, + name: item.name, + metadata: item.metadata, + createdAt, + processedAt: null, + error: null, + }) + } + } + + return results + } + + override async create(data: DbIntegrationStreamCreateData): Promise { + const transaction = this.transaction + + const seq = this.seq + + const id = uuidV1() + + const query = ` + insert into "integrationStreams"(id, "runId", "integrationId", state, name, metadata) + values(:id, :runId, :integrationId, :state, :name, :metadata) + returning "createdAt"; + ` + + const result = await seq.query(query, { + replacements: { + id, + runId: data.runId, + state: IntegrationStreamState.PENDING, + integrationId: data.integrationId, + name: data.name, + metadata: data.metadata, + }, + type: QueryTypes.SELECT, + transaction, + }) + + if (result.length !== 1) { + throw new Error(`Expected 1 row to be inserted, got ${result.length} rows instead.`) + } + + return { + id, + runId: data.runId, + state: IntegrationStreamState.PENDING, + integrationId: data.integrationId, + name: data.name, + metadata: data.metadata, + createdAt: (result[0] as any).createdAt, + processedAt: null, + error: null, + } + } + + async markProcessing(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationStreams" + set state = :state, + "updatedAt" = now() + where id = :id; + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + state: IntegrationStreamState.PROCESSING, + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } + + async markProcessed(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationStreams" + set state = :state, + "processedAt" = now(), + "updatedAt" = now() + where id = :id; + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + state: IntegrationStreamState.PROCESSED, + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } + + async markError(id: string, error: any): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationStreams" + set state = :state, + "processedAt" = now(), + error = :error, + "updatedAt" = now() + where id = :id; + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + error, + state: IntegrationStreamState.ERROR, + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } +} diff --git a/backend/src/types/integrationStreamTypes.ts b/backend/src/types/integrationStreamTypes.ts new file mode 100644 index 0000000000..6ce9cf6935 --- /dev/null +++ b/backend/src/types/integrationStreamTypes.ts @@ -0,0 +1,27 @@ +import { String } from 'aws-sdk/clients/cloudtrail' + +export enum IntegrationStreamState { + PENDING = 'pending', + PROCESSING = 'processing', + PROCESSED = 'processed', + ERROR = 'error', +} + +export interface IntegrationStream { + id: string + runId: String + integrationId: string + state: IntegrationStreamState + name: string + metadata: any + processedAt: string | null + error: any | null + createdAt: string +} + +export interface DbIntegrationStreamCreateData { + runId: string + integrationId: string + name: string + metadata: any +} From 6e4c82a7870a0be3db5ce02329b59b4d0511208e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 28 Mar 2023 10:24:23 +0200 Subject: [PATCH 2/5] Integration runs in the database, individual run streams in the database --- backend/package.json | 1 + backend/src/bin/discord-ws.ts | 4 +- backend/src/bin/scripts/continue-run.ts | 82 ++ .../src/bin/scripts/process-integration.ts | 51 +- .../V1679825091__stream-delaying.sql | 55 +- .../repositories/integrationRunRepository.ts | 380 +++++++++ .../integrationStreamRepository.ts | 88 +- .../services/integrationProcessor.ts | 779 +++++++++--------- .../services/integrationServiceBase.ts | 29 +- .../integrations/discordIntegrationService.ts | 39 +- .../integrations/githubIntegrationService.ts | 5 +- .../hackerNewsIntegrationService.ts | 5 +- .../premium/linkedinIntegrationService.ts | 9 +- .../integrations/redditIntegrationService.ts | 5 +- .../integrations/slackIntegrationService.ts | 13 +- .../stackOverflowIntegrationService.ts | 9 +- .../integrations/twitterIntegrationService.ts | 3 +- .../integrations/types/discordTypes.ts | 4 +- backend/src/services/conversationService.ts | 1 - backend/src/services/integrationService.ts | 585 ++++++++----- backend/src/types/integration/stepResult.ts | 15 +- backend/src/types/integrationRunTypes.ts | 29 + backend/src/types/integrationStreamTypes.ts | 13 +- .../mq/nodeWorkerIntegrationProcessMessage.ts | 13 +- 24 files changed, 1496 insertions(+), 721 deletions(-) create mode 100644 backend/src/bin/scripts/continue-run.ts create mode 100644 backend/src/database/repositories/integrationRunRepository.ts create mode 100644 backend/src/types/integrationRunTypes.ts diff --git a/backend/package.json b/backend/package.json index ffc517a021..c125369218 100644 --- a/backend/package.json +++ b/backend/package.json @@ -31,6 +31,7 @@ "format": "npx prettier --write .", "tsc-check": "tsc --noEmit", "script:process-integration": "SERVICE=script ts-node ./src/bin/scripts/process-integration.ts", + "script:continue-run": "SERVICE=script ts-node ./src/bin/scripts/continue-run.ts", "script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts", "script:process-webhook": "SERVICE=script ts-node ./src/bin/scripts/process-webhook.ts", "script:send-weekly-analytics-email": "SERVICE=script ts-node ./src/bin/scripts/send-weekly-analytics-email.ts", diff --git a/backend/src/bin/discord-ws.ts b/backend/src/bin/discord-ws.ts index 219365fc29..a2fa6f2941 100644 --- a/backend/src/bin/discord-ws.ts +++ b/backend/src/bin/discord-ws.ts @@ -221,11 +221,13 @@ setImmediate(async () => { } if (triggerCheck) { + const repoOptions = await SequelizeRepository.getDefaultIRepositoryOptions() + const integrations = await IntegrationRepository.findAllActive(PlatformType.DISCORD) if (integrations.length > 0) { log.warn(`Found ${integrations.length} integrations to trigger check for!`) const service = new DiscordIntegrationService() - await service.triggerIntegrationCheck(integrations) + await service.triggerIntegrationCheck(integrations, repoOptions) } else { log.warn('Found no integrations to trigger check for!') } diff --git a/backend/src/bin/scripts/continue-run.ts b/backend/src/bin/scripts/continue-run.ts new file mode 100644 index 0000000000..0546572927 --- /dev/null +++ b/backend/src/bin/scripts/continue-run.ts @@ -0,0 +1,82 @@ +import commandLineArgs from 'command-line-args' +import commandLineUsage from 'command-line-usage' +import * as fs from 'fs' +import path from 'path' +import { createServiceLogger } from '../../utils/logging' +import SequelizeRepository from '../../database/repositories/sequelizeRepository' +import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS' +import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage' +import IntegrationRunRepository from '../../database/repositories/integrationRunRepository' +import { IntegrationRunState } from '../../types/integrationRunTypes' + +const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8') + +const log = createServiceLogger() + +const options = [ + { + name: 'run', + alias: 'r', + typeLabel: '{underline runId}', + type: String, + description: + 'The unique ID of integration run that you would like to continue processing. Use comma delimiter when sending multiple integration runs.', + }, + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Print this usage guide.', + }, +] +const sections = [ + { + content: banner, + raw: true, + }, + { + header: 'Continue Processing Integration Run', + content: 'Trigger processing of integration run.', + }, + { + header: 'Options', + optionList: options, + }, +] + +const usage = commandLineUsage(sections) +const parameters = commandLineArgs(options) + +if (parameters.help && !parameters.run) { + console.log(usage) +} else { + setImmediate(async () => { + const options = await SequelizeRepository.getDefaultIRepositoryOptions() + + const runRepo = new IntegrationRunRepository(options) + + const runIds = parameters.run.split(',') + for (const runId of runIds) { + const run = await runRepo.findById(runId) + + if (!run) { + log.error({ runId }, 'Integration run not found!') + process.exit(1) + } else { + await log.info({ runId }, 'Integration run found - triggering SQS message!') + + if (run.state !== IntegrationRunState.PENDING) { + log.warn( + { currentState: run.state }, + `Setting integration state to ${IntegrationRunState.PENDING}!`, + ) + await runRepo.restart(run.id) + } + + await sendNodeWorkerMessage(run.tenantId, new NodeWorkerIntegrationProcessMessage(run.id)) + } + } + + process.exit(0) + }) +} diff --git a/backend/src/bin/scripts/process-integration.ts b/backend/src/bin/scripts/process-integration.ts index 09958854f9..03bafb1bda 100644 --- a/backend/src/bin/scripts/process-integration.ts +++ b/backend/src/bin/scripts/process-integration.ts @@ -7,6 +7,8 @@ import SequelizeRepository from '../../database/repositories/sequelizeRepository import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS' import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage' import IntegrationRepository from '../../database/repositories/integrationRepository' +import IntegrationRunRepository from '../../database/repositories/integrationRunRepository' +import { IntegrationRunState } from '../../types/integrationRunTypes' const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8') @@ -65,19 +67,31 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) { const onboarding = parameters.onboarding const options = await SequelizeRepository.getDefaultIRepositoryOptions() + const runRepo = new IntegrationRunRepository(options) + if (parameters.platform) { const integrations = await IntegrationRepository.findAllActive(parameters.platform) for (const i of integrations) { const integration = i as any log.info({ integrationId: integration.id, onboarding }, 'Triggering SQS message!') + + const existingRun = await runRepo.findLastProcessingRun(integration.id) + + if (existingRun && existingRun.onboarding) { + log.error('Integration is already processing, skipping!') + return + } + + const run = await runRepo.create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding, + state: IntegrationRunState.PENDING, + }) + await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - integration.platform, - integration.tenantId, - onboarding, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) } } else { @@ -92,15 +106,24 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) { process.exit(1) } else { log.info({ integrationId, onboarding }, 'Integration found - triggering SQS message!') - await sendNodeWorkerMessage( - integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - integration.platform, - integration.tenantId, + + const existingRun = await runRepo.findLastProcessingRun(integration.id) + + if (existingRun && existingRun.onboarding) { + log.error('Integration is already processing, skipping!') + } else { + const run = await runRepo.create({ + integrationId: integration.id, + tenantId: integration.tenantId, onboarding, - integration.id, - ), - ) + state: IntegrationRunState.PENDING, + }) + + await sendNodeWorkerMessage( + integration.tenantId, + new NodeWorkerIntegrationProcessMessage(run.id), + ) + } } } } diff --git a/backend/src/database/migrations/V1679825091__stream-delaying.sql b/backend/src/database/migrations/V1679825091__stream-delaying.sql index aad86b0c74..2b2e4576fc 100644 --- a/backend/src/database/migrations/V1679825091__stream-delaying.sql +++ b/backend/src/database/migrations/V1679825091__stream-delaying.sql @@ -1,37 +1,50 @@ create table "integrationRuns" ( - id uuid not null, - "integrationId" uuid not null, + id uuid not null, + "tenantId" uuid not null, + "integrationId" uuid null, + "microserviceId" uuid null, - onboarding boolean not null, - state varchar(255) not null, + onboarding boolean not null, + state varchar(255) not null, - "processedAt" timestamptz null, - "streamCount" int null, - error json null, + "delayedUntil" timestamptz null, - "createdAt" timestamptz not null default now(), + "processedAt" timestamptz null, + error json null, + "processedStreamCount" int null, + "errorStreamCount" int null, - foreign key ("integrationId") references integrations (id), + "createdAt" timestamptz not null default now(), + "updatedAt" timestamptz not null default now(), + + foreign key ("tenantId") references tenants (id) on delete cascade, + foreign key ("integrationId") references integrations (id) on delete cascade, + foreign key ("microserviceId") references microservices (id) on delete cascade, primary key (id) ); create table "integrationStreams" ( - id uuid not null, - "runId" uuid not null, - "integrationId" uuid not null, + id uuid not null, + "runId" uuid not null, + "tenantId" uuid not null, + "integrationId" uuid null, + "microserviceId" uuid null, - state varchar(255) not null, + state varchar(255) not null, - name text not null, - metadata json not null, + name text not null, + metadata json not null, - "processedAt" timestamptz null, - error json null, + "processedAt" timestamptz null, + error json null, + retries int null, - "createdAt" timestamptz not null default now(), - "updatedAt" timestamptz not null default now(), + "createdAt" timestamptz not null default now(), + "updatedAt" timestamptz not null default now(), - foreign key ("runId") references "integrationRuns" (id), - foreign key ("integrationId") references integrations (id), + foreign key ("runId") references "integrationRuns" (id) on delete cascade, + foreign key ("tenantId") references tenants (id) on delete cascade, + foreign key ("integrationId") references integrations (id) on delete cascade, + foreign key ("microserviceId") references microservices (id) on delete cascade, primary key (id) ); \ No newline at end of file diff --git a/backend/src/database/repositories/integrationRunRepository.ts b/backend/src/database/repositories/integrationRunRepository.ts new file mode 100644 index 0000000000..6bbee124e3 --- /dev/null +++ b/backend/src/database/repositories/integrationRunRepository.ts @@ -0,0 +1,380 @@ +import { QueryTypes } from 'sequelize' +import { v1 as uuidV1 } from 'uuid' +import { + IntegrationRunState, + IntegrationRun, + DbIntegrationRunCreateData, +} from '../../types/integrationRunTypes' +import { IntegrationStreamState } from '../../types/integrationStreamTypes' +import { IRepositoryOptions } from './IRepositoryOptions' +import { RepositoryBase } from './repositoryBase' + +export default class IntegrationRunRepository extends RepositoryBase< + IntegrationRun, + string, + DbIntegrationRunCreateData, + unknown, + unknown +> { + public constructor(options: IRepositoryOptions) { + super(options, true) + } + + async findDelayedRuns(): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + select id, + "tenantId", + "integrationId", + "microserviceId", + onboarding, + state, + "delayedUntil", + "processedAt", + error, + "processedStreamCount", + "errorStreamCount", + "createdAt" + from "integrationRuns" + where state = :delayedState and "delayedUntil" <= now() + order by "createdAt" desc + ` + + const results = await seq.query(query, { + replacements: { + delayedState: IntegrationRunState.DELAYED, + }, + type: QueryTypes.SELECT, + transaction, + }) + + return results as IntegrationRun[] + } + + async findLastProcessingRun( + integrationId?: string, + microserviceId?: string, + ignoreId?: string, + ): Promise { + const transaction = this.transaction + + const seq = this.seq + + let condition = `` + const replacements: any = { + delayedState: IntegrationRunState.DELAYED, + processingState: IntegrationRunState.PROCESSING, + pendingState: IntegrationRunState.PENDING, + } + + if (integrationId) { + condition = ` "integrationId" = :integrationId ` + replacements.integrationId = integrationId + } else if (microserviceId) { + condition = ` "microserviceId" = :microserviceId ` + replacements.microserviceId = microserviceId + } else { + throw new Error(`Either integrationId or microserviceId must be provided!`) + } + + if (ignoreId) { + condition = `${condition} and id <> :ignoreId` + replacements.ignoreId = ignoreId + } + + const query = ` + select id, + "tenantId", + "integrationId", + "microserviceId", + onboarding, + state, + "delayedUntil", + "processedAt", + error, + "processedStreamCount", + "errorStreamCount", + "createdAt" + from "integrationRuns" + where state in (:delayedState, :processingState, :pendingState) and ${condition} + order by "createdAt" desc + limit 1 + ` + + const results = await seq.query(query, { + replacements, + type: QueryTypes.SELECT, + transaction, + }) + + if (results.length === 1) { + return results[0] as IntegrationRun + } + + return undefined + } + + override async findById(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + select id, + "tenantId", + "integrationId", + "microserviceId", + onboarding, + state, + "delayedUntil", + "processedAt", + error, + "processedStreamCount", + "errorStreamCount", + "createdAt" + from "integrationRuns" where id = :id; + ` + + const result = await seq.query(query, { + replacements: { + id, + }, + type: QueryTypes.SELECT, + transaction, + }) + + if (result.length !== 1) { + throw new Error(`Expected 1 row to be selected, got ${result.length} rows instead.`) + } + + return result[0] as IntegrationRun + } + + override async create(data: DbIntegrationRunCreateData): Promise { + const transaction = this.transaction + + const seq = this.seq + + const id = uuidV1() + + const query = ` + insert into "integrationRuns"(id, "tenantId", "integrationId", "microserviceId", onboarding, state) + values(:id, :tenantId, :integrationId, :microserviceId, :onboarding, :state) + returning "createdAt"; + ` + + const result = await seq.query(query, { + replacements: { + id, + tenantId: data.tenantId, + integrationId: data.integrationId || null, + microserviceId: data.microserviceId || null, + onboarding: data.onboarding, + state: data.state, + }, + type: QueryTypes.SELECT, + transaction, + }) + + if (result.length !== 1) { + throw new Error(`Expected 1 row to be inserted, got ${result.length} rows instead.`) + } + + return { + id, + tenantId: data.tenantId, + integrationId: data.integrationId, + microserviceId: data.microserviceId, + onboarding: data.onboarding, + state: data.state, + delayedUntil: null, + processedAt: null, + streamCount: null, + error: null, + createdAt: (result[0] as any).createdAt, + } + } + + async markProcessing(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationRuns" + set state = :state, + "delayedUntil" = null, + "updatedAt" = now() + where id = :id + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + state: IntegrationRunState.PROCESSING, + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } + + async restart(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationRuns" + set state = :state, + "delayedUntil" = null, + "processedAt" = null, + "processedStreamCount" = null, + "errorStreamCount" = null, + error = null, + "updatedAt" = now() + where id = :id + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + state: IntegrationRunState.PENDING, + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } + + async markError(id: string, error: any): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationRuns" + set state = :state, + error = :error, + "updatedAt" = now() + where id = :id + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + state: IntegrationRunState.ERROR, + error: JSON.stringify(error), + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } + + async delay(id: string, until: Date): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationRuns" + set "delayedUntil" = :until, + state = :state, + "updatedAt" = now() + where id = :id + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await seq.query(query, { + replacements: { + id, + until, + state: IntegrationRunState.DELAYED, + }, + type: QueryTypes.UPDATE, + transaction, + }) + + if (rowCount !== 1) { + throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + } + } + + async touchState(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationRuns" + set "processedStreamCount" = (select count(s.id) + from "integrationStreams" s + where s."runId" = :id + and s.state = :successStreamState), + "errorStreamCount" = (select count(s.id) + from "integrationStreams" s + where s."runId" = :id + and s.state = :errorStreamState), + "processedAt" = case + when (select count(s.id) = + (count(s.id) filter ( where s.state = :successStreamState ) + + count(s.id) filter (where s.state = :errorStreamState and s.retries >= :maxRetries)) + from "integrationStreams" s + where s."runId" = :id) then now() + end, + state = case + when (select (count(s.id) = + (count(s.id) filter ( where s.state = :successStreamState ) + + count(s.id) filter (where s.state = :errorStreamState))) and + (count(s.id) filter (where s.state = :errorStreamState and s.retries >= :maxRetries)) = 0 + from "integrationStreams" s + where s."runId" = :id) then :successRunState + when (select (count(s.id) = + (count(s.id) filter ( where s.state = :successStreamState ) + + count(s.id) filter (where s.state = :errorStreamState))) and + (count(s.id) filter (where s.state = :errorStreamState and s.retries >= :maxRetries)) > 0 + from "integrationStreams" s + where s."runId" = :id) then :errorRunState + end, + "updatedAt" = now() + where id = :id + returning state; + ` + + const result = await seq.query(query, { + replacements: { + id, + successStreamState: IntegrationStreamState.PROCESSED, + errorStreamState: IntegrationStreamState.ERROR, + successRunState: IntegrationRunState.PROCESSED, + processingRunState: IntegrationRunState.PROCESSING, + errorRunState: IntegrationRunState.ERROR, + maxRetries: 5, + }, + type: QueryTypes.SELECT, + transaction, + }) + + if (result.length !== 1) { + throw new Error(`Expected 1 row to be updated, got ${result.length} rows instead.`) + } + + return (result[0] as any).state + } +} diff --git a/backend/src/database/repositories/integrationStreamRepository.ts b/backend/src/database/repositories/integrationStreamRepository.ts index f2e4a984de..c9e57639c2 100644 --- a/backend/src/database/repositories/integrationStreamRepository.ts +++ b/backend/src/database/repositories/integrationStreamRepository.ts @@ -28,12 +28,15 @@ export default class IntegrationStreamRepository extends RepositoryBase< const query = ` select id, "runId", + "tenantId", "integrationId", + "microserviceId", state, name, metadata, "processedAt", error, + retries, "createdAt" from "integrationStreams" where id = :id; ` @@ -61,14 +64,19 @@ export default class IntegrationStreamRepository extends RepositoryBase< const query = ` select id, "runId", + "tenantId", "integrationId", + "microserviceId", state, name, metadata, "processedAt", error, + retries, "createdAt" - from "integrationStreams" where "runId" = :runId; + from "integrationStreams" where "runId" = :runId + -- we are using uuid v1 so we can sort by it + order by "id"; ` const result = await seq.query(query, { @@ -87,12 +95,12 @@ export default class IntegrationStreamRepository extends RepositoryBase< const seq = this.seq - const batches = lodash.chunk(data, 999) + const batches = lodash.chunk(data, 999) as DbIntegrationStreamCreateData[][] const results: IntegrationStream[] = [] const query = ` - insert into "integrationStreams"(id, "runId", "integrationId", state, name, metadata) + insert into "integrationStreams"(id, "runId", "tenantId", "integrationId", "microserviceId", state, name, metadata) values ` @@ -104,14 +112,16 @@ export default class IntegrationStreamRepository extends RepositoryBase< for (const item of batch) { const id = uuidV1() values.push( - `(:id${i}, :runId${i}, :integrationId${i}, :state${i}, :name${i}, :metadata${i})`, + `(:id${i}, :runId${i}, :tenantId${i}, :integrationId${i}, :microserviceId${i}, :state${i}, :name${i}, :metadata${i})`, ) replacements[`id${i}`] = id replacements[`runId${i}`] = item.runId + replacements[`tenantId${i}`] = item.tenantId replacements[`state${i}`] = IntegrationStreamState.PENDING - replacements[`integrationId${i}`] = item.integrationId + replacements[`integrationId${i}`] = item.integrationId || null + replacements[`microserviceId${i}`] = item.microserviceId || null replacements[`name${i}`] = item.name - replacements[`metadata${i}`] = item.metadata + replacements[`metadata${i}`] = JSON.stringify(item.metadata || {}) i++ } @@ -135,13 +145,16 @@ export default class IntegrationStreamRepository extends RepositoryBase< results.push({ id: replacements[`id${j}`], runId: item.runId, + tenantId: item.tenantId, state: IntegrationStreamState.PENDING, integrationId: item.integrationId, + microserviceId: item.microserviceId, name: item.name, - metadata: item.metadata, + metadata: item.metadata || {}, createdAt, processedAt: null, error: null, + retries: null, }) } } @@ -157,8 +170,8 @@ export default class IntegrationStreamRepository extends RepositoryBase< const id = uuidV1() const query = ` - insert into "integrationStreams"(id, "runId", "integrationId", state, name, metadata) - values(:id, :runId, :integrationId, :state, :name, :metadata) + insert into "integrationStreams"(id, "runId", "tenantId", "integrationId", "microserviceId", state, name, metadata) + values(:id, :runId, :tenantId, :integrationId, :microserviceId, :state, :name, :metadata) returning "createdAt"; ` @@ -166,10 +179,12 @@ export default class IntegrationStreamRepository extends RepositoryBase< replacements: { id, runId: data.runId, + tenantId: data.tenantId, state: IntegrationStreamState.PENDING, - integrationId: data.integrationId, + integrationId: data.integrationId || null, + microserviceId: data.microserviceId || null, name: data.name, - metadata: data.metadata, + metadata: JSON.stringify(data.metadata || {}), }, type: QueryTypes.SELECT, transaction, @@ -182,13 +197,16 @@ export default class IntegrationStreamRepository extends RepositoryBase< return { id, runId: data.runId, + tenantId: data.tenantId, state: IntegrationStreamState.PENDING, integrationId: data.integrationId, + microserviceId: data.microserviceId, name: data.name, - metadata: data.metadata, + metadata: data.metadata || {}, createdAt: (result[0] as any).createdAt, processedAt: null, error: null, + retries: null, } } @@ -247,7 +265,7 @@ export default class IntegrationStreamRepository extends RepositoryBase< } } - async markError(id: string, error: any): Promise { + async markError(id: string, error: any): Promise { const transaction = this.transaction const seq = this.seq @@ -257,23 +275,57 @@ export default class IntegrationStreamRepository extends RepositoryBase< set state = :state, "processedAt" = now(), error = :error, + retries = coalesce(retries, 0) + 1, "updatedAt" = now() - where id = :id; + where id = :id + returning retries; ` // eslint-disable-next-line @typescript-eslint/no-unused-vars - const [_, rowCount] = await seq.query(query, { + const result = await seq.query(query, { replacements: { id, - error, + error: JSON.stringify(error), state: IntegrationStreamState.ERROR, }, + type: QueryTypes.SELECT, + transaction, + }) + + if (result.length !== 1) { + throw new Error(`Expected 1 row to be updated, got ${result.length} rows instead.`) + } + + return (result[0] as any).retries + } + + async reset(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const query = ` + update "integrationStreams" + set state = :state, + "processedAt" = null, + error = null, + retries = null, + "updatedAt" = now() + where id = :id; + ` + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, count] = await seq.query(query, { + replacements: { + id, + state: IntegrationStreamState.PENDING, + }, type: QueryTypes.UPDATE, transaction, }) - if (rowCount !== 1) { - throw new Error(`Expected 1 row to be updated, got ${rowCount} rows instead.`) + if (count !== 1) { + throw new Error(`Expected 1 row to be updated, got ${count} rows instead.`) } } } diff --git a/backend/src/serverless/integrations/services/integrationProcessor.ts b/backend/src/serverless/integrations/services/integrationProcessor.ts index 1028fe3033..816e07d6b5 100644 --- a/backend/src/serverless/integrations/services/integrationProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationProcessor.ts @@ -1,7 +1,6 @@ // noinspection ExceptionCaughtLocallyJS import moment from 'moment' -import { v4 as uuid } from 'uuid' import path from 'path' import fs from 'fs' import { createChildLogger, Logger } from '../../../utils/logging' @@ -12,16 +11,18 @@ import { IServiceOptions } from '../../../services/IServiceOptions' import { singleOrDefault } from '../../../utils/arrays' import { IntegrationType, PlatformType } from '../../../types/integrationEnums' import { NodeWorkerIntegrationCheckMessage } from '../../../types/mq/nodeWorkerIntegrationCheckMessage' -import { - IIntegrationStreamRetry, - NodeWorkerIntegrationProcessMessage, -} from '../../../types/mq/nodeWorkerIntegrationProcessMessage' +import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' import { DevtoIntegrationService } from './integrations/devtoIntegrationService' import { IntegrationServiceBase } from './integrationServiceBase' import bulkOperations from '../../dbOperations/operationsWorker' import { DiscordIntegrationService } from './integrations/discordIntegrationService' -import { IIntegrationStream, IStepContext } from '../../../types/integration/stepResult' +import { + IFailedIntegrationStream, + IIntegrationStream, + IProcessStreamResults, + IStepContext, +} from '../../../types/integration/stepResult' import { TwitterIntegrationService } from './integrations/twitterIntegrationService' import { HackerNewsIntegrationService } from './integrations/hackerNewsIntegrationService' import { RedditIntegrationService } from './integrations/redditIntegrationService' @@ -37,7 +38,6 @@ import { i18n } from '../../../i18n' import { IRedisPubSubEmitter, RedisClient } from '../../../utils/redis' import RedisPubSubEmitter from '../../../utils/redis/pubSubEmitter' import { ApiWebsocketMessage } from '../../../types/mq/apiWebsocketMessage' -import { RedisCache } from '../../../utils/redis/redisCache' import SequelizeRepository from '../../../database/repositories/sequelizeRepository' import { IRepositoryOptions } from '../../../database/repositories/IRepositoryOptions' import IncomingWebhookRepository from '../../../database/repositories/incomingWebhookRepository' @@ -45,15 +45,22 @@ import { WebhookError, WebhookState } from '../../../types/webhooks' import { NodeWorkerProcessWebhookMessage } from '../../../types/mq/nodeWorkerProcessWebhookMessage' import SampleDataService from '../../../services/sampleDataService' import { sendSlackAlert, SlackAlertTypes } from '../../../utils/slackAlerts' - -const MAX_STREAM_RETRIES = 5 +import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' +import { IntegrationRun, IntegrationRunState } from '../../../types/integrationRunTypes' +import IntegrationStreamRepository from '../../../database/repositories/integrationStreamRepository' +import { + DbIntegrationStreamCreateData, + IntegrationStreamState, +} from '../../../types/integrationStreamTypes' export class IntegrationProcessor extends LoggingBase { private readonly integrationServices: IntegrationServiceBase[] - private readonly apiPubSubEmitter?: IRedisPubSubEmitter + private readonly integrationRunRepository: IntegrationRunRepository - private readonly redisCache?: RedisCache + private readonly integrationStreamRepository: IntegrationStreamRepository + + private readonly apiPubSubEmitter?: IRedisPubSubEmitter private tickTrackingMap: Map = new Map() @@ -98,12 +105,33 @@ export class IntegrationProcessor extends LoggingBase { this.apiPubSubEmitter = new RedisPubSubEmitter('api-pubsub', redisEmitterClient, (err) => { this.log.error({ err }, 'Error in api-ws emitter!') }) - - this.redisCache = new RedisCache('integrationProcessor', redisEmitterClient) } + + this.integrationRunRepository = new IntegrationRunRepository(options) + this.integrationStreamRepository = new IntegrationStreamRepository(options) } async processTick() { + await this.processCheckTick() + await this.processDelayedTick() + } + + private async processDelayedTick() { + this.log.trace('Checking for delayed integration runs!') + + const delayedRuns = await this.integrationRunRepository.findDelayedRuns() + + for (const run of delayedRuns) { + this.log.info({ runId: run.id }, 'Triggering delayed integration run processing!') + + await sendNodeWorkerMessage( + new Date().toISOString(), + new NodeWorkerIntegrationProcessMessage(run.id), + ) + } + } + + private async processCheckTick() { this.log.trace('Processing integration processor tick!') for (const intService of this.integrationServices) { @@ -146,21 +174,27 @@ export class IntegrationProcessor extends LoggingBase { if (microservices.length > 0) { this.log.debug({ type, count: microservices.length }, 'Found microservices to check!') for (const micro of microservices) { - const isProcessing = await this.redisCache.getValue(micro.id) - if (isProcessing === null) { + const existingRun = await this.integrationRunRepository.findLastProcessingRun( + undefined, + micro.id, + ) + if (!existingRun) { const microservice = micro as any + + const run = await this.integrationRunRepository.create({ + microserviceId: microservice.id, + tenantId: microservice.tenantId, + onboarding: false, + state: IntegrationRunState.PENDING, + }) + + this.log.debug({ type, runId: run.id }, 'Triggering microservice processing!') + await sendNodeWorkerMessage( microservice.tenantId, - new NodeWorkerIntegrationProcessMessage( - type, - microservice.tenantId, - false, - undefined, - microservice.id, - { - platform: PlatformType.TWITTER, - }, - ), + new NodeWorkerIntegrationProcessMessage(run.id, { + platform: PlatformType.TWITTER, + }), ) } } @@ -170,18 +204,22 @@ export class IntegrationProcessor extends LoggingBase { } else { // get the relevant integration service that is supposed to be configured already const intService = singleOrDefault(this.integrationServices, (s) => s.type === type) + const options = + (await SequelizeRepository.getDefaultIRepositoryOptions()) as IRepositoryOptions const integrations = await IntegrationRepository.findAllActive(type) if (integrations.length > 0) { logger.debug({ count: integrations.length }, 'Found integrations to check!') const inactiveIntegrations: any[] = [] for (const integration of integrations as any[]) { - const isProcessing = await this.redisCache.getValue(integration.id) - if (isProcessing === null) { + const existingRun = await this.integrationRunRepository.findLastProcessingRun( + integration.id, + ) + if (!existingRun) { inactiveIntegrations.push(integration) } } - await intService.triggerIntegrationCheck(inactiveIntegrations) + await intService.triggerIntegrationCheck(inactiveIntegrations, options) } else { logger.debug('Found no integrations to check!') } @@ -289,49 +327,82 @@ export class IntegrationProcessor extends LoggingBase { } async process(req: NodeWorkerIntegrationProcessMessage) { + if (!req.runId) { + this.log.warn("No runId provided! Skipping because it's an old message.") + return + } + + this.log.info({ runId: req.runId }, 'Detected integration run!') + + const run = await this.integrationRunRepository.findById(req.runId) + + const userContext = await getUserContext(run.tenantId) + + // load integration from database + const integration = run.integrationId + ? await IntegrationRepository.findById(run.integrationId, userContext) + : await IntegrationRepository.findByPlatform(req.metadata.platform, userContext) + const logger = createChildLogger('process', this.log, { - type: req.integrationType, - tenantId: req.tenantId, - integrationId: req.integrationId, - onboarding: req.onboarding, - microserviceId: req.microserviceId, + runId: req.runId, + type: integration.platform, + tenantId: integration.tenantId, + integrationId: run.integrationId, + onboarding: run.onboarding, + microserviceId: run.microserviceId, }) - logger.info('Processing integration!') - const startOfProcessing = moment() + logger.info('Processing integration!') - const userContext = await getUserContext(req.tenantId) userContext.log = logger - // load integration from database - const integration = req.integrationId - ? await IntegrationRepository.findById(req.integrationId, userContext) - : await IntegrationRepository.findByPlatform(req.metadata.platform, userContext) + const existingRun = await this.integrationRunRepository.findLastProcessingRun( + run.integrationId, + run.microserviceId, + req.runId, + ) - if (!req.onboarding) { - const processing = await this.redisCache.getValue(integration.id) - if (processing !== null) { - logger.info('Integration is already being processed!') - return - } + if (existingRun) { + logger.info('Integration is already being processed!') + await this.integrationRunRepository.markError(req.runId, { + message: 'Integration is already being processed!', + existingRunId: existingRun.id, + }) + return + } + + if (run.state === IntegrationRunState.PROCESSED) { + logger.warn('Integration is already processed!') + return + } + + if (run.state === IntegrationRunState.PENDING) { + logger.info('Started processing integration!') + } else if (run.state === IntegrationRunState.DELAYED) { + logger.info('Continued processing delayed integration!') + } else if (run.state === IntegrationRunState.ERROR) { + logger.info('Restarted processing errored integration!') + } else if (run.state === IntegrationRunState.PROCESSING) { + throw new Error(`Invalid state '${run.state}' for integration run!`) } - await this.redisCache.setValue(integration.id, 'processing', 5 * 60) + await this.integrationRunRepository.markProcessing(req.runId) + run.state = IntegrationRunState.PROCESSING // get the relevant integration service that is supposed to be configured already const intService = singleOrDefault( this.integrationServices, - (s) => s.type === req.integrationType, + (s) => s.type === integration.platform, ) if (intService === undefined) { logger.error('No integration service configured!') - throw new Error(`No integration service configured for type '${req.integrationType}'!`) + throw new Error(`No integration service configured for type '${integration.platform}'!`) } const stepContext: IStepContext = { startTimestamp: moment().utc().unix(), limitCount: integration.limitCount || 0, - onboarding: req.onboarding, + onboarding: run.onboarding, pipelineData: {}, integration, serviceContext: userContext, @@ -353,13 +424,12 @@ export class IntegrationProcessor extends LoggingBase { } // delete sample data on onboarding - if (req.onboarding) { + if (run.onboarding) { await new SampleDataService(userContext).deleteSampleData() } - const failedStreams = [] - let setError = false - let stillProcessing = false + // keep track of failed streams + const failedStreams: IFailedIntegrationStream[] = [] try { // check global limit reset @@ -391,14 +461,7 @@ export class IntegrationProcessor extends LoggingBase { if (err.rateLimitResetSeconds) { // need to delay integration processing logger.warn(err, 'Rate limit reached while preprocessing integration! Delaying...') - await this.handleRateLimitError( - logger, - intService, - req, - startOfProcessing, - err.rateLimitResetSeconds, - ) - stillProcessing = true + await this.handleRateLimitError(logger, run, err.rateLimitResetSeconds, stepContext) return } @@ -407,39 +470,43 @@ export class IntegrationProcessor extends LoggingBase { // detect streams to process for this integration let streams: IIntegrationStream[] - if ( - (req.retryStreams && req.retryStreams.length > 0) || - (req.remainingStreams && req.remainingStreams.length > 0) - ) { - const retryStreams = req.retryStreams || [] - streams = req.remainingStreams || [] - - logger.info( - { retryStreamCount: retryStreams.length, delayedStreamCount: streams.length }, - 'Detected retried/delayed streams in request - skipping integration service getStreams method call!', - ) - for (const retryStream of retryStreams) { - const stream = retryStream.stream - stream.id = retryStream.id - streams.push(stream) - } + const dbStreams = await this.integrationStreamRepository.findByRunId(req.runId) + if (dbStreams.length > 0) { + streams = dbStreams + .filter( + (s) => + s.state === IntegrationStreamState.PENDING || + (s.state === IntegrationStreamState.ERROR && s.retries <= 5), + ) + .map((s) => ({ + id: s.id, + value: s.name, + metadata: s.metadata, + })) } else { logger.trace('Detecting streams!') try { - streams = await intService.getStreams(stepContext) + const pendingStreams = await intService.getStreams(stepContext) + const createStreams: DbIntegrationStreamCreateData[] = pendingStreams.map((s) => ({ + runId: req.runId, + tenantId: run.tenantId, + integrationId: run.integrationId, + microserviceId: run.microserviceId, + name: s.value, + metadata: s.metadata, + })) + const results = await this.integrationStreamRepository.bulkCreate(createStreams) + streams = results.map((r) => ({ + id: r.id, + value: r.name, + metadata: r.metadata, + })) } catch (err) { if (err.rateLimitResetSeconds) { // need to delay integration processing logger.warn(err, 'Rate limit reached while getting integration streams! Delaying...') - await this.handleRateLimitError( - logger, - intService, - req, - startOfProcessing, - err.rateLimitResetSeconds, - ) - stillProcessing = true + await this.handleRateLimitError(logger, run, err.rateLimitResetSeconds, stepContext) return } @@ -447,11 +514,6 @@ export class IntegrationProcessor extends LoggingBase { } } - // delay for retries/continuing with the remaining streams (in seconds) - let delay: number = 5 - - let exit = false - if (streams.length > 0) { logger.info({ streamCount: streams.length }, 'Detected streams to process!') @@ -459,17 +521,16 @@ export class IntegrationProcessor extends LoggingBase { let processedCount = 0 let notifyCount = 0 while (streams.length > 0) { - // reset value - await this.redisCache.setValue(integration.id, 'processing', 5 * 60) - if ((req as any).exiting) { - if (!req.onboarding) { + if (!run.onboarding) { logger.warn('Stopped processing integration (not onboarding)!') - exit = true break } else { logger.warn('Stopped processing integration (onboarding)!') - delay = 3 * 60 + const delayUntil = moment() + .add(3 * 60, 'seconds') + .toDate() + await this.integrationRunRepository.delay(req.runId, delayUntil) break } } @@ -479,199 +540,204 @@ export class IntegrationProcessor extends LoggingBase { processedCount++ notifyCount++ - // surround with try catch so if one stream fails we try all of them as well just in case + let processStreamResult: IProcessStreamResults + + logger.trace({ streamId: stream.id }, 'Processing stream!') + await this.integrationStreamRepository.markProcessing(stream.id) try { - logger.trace( - { stream: JSON.stringify(stream) }, - `Processing stream! Still have ${streams.length} streams left to process!`, - ) - let processStreamResult - try { - processStreamResult = await intService.processStream(stream, stepContext) - } catch (err) { - if (err.rateLimitResetSeconds) { - logger.warn( - { stream: JSON.stringify(stream), delay, message: err.message }, - 'Rate limit reached while processing stream! Delaying...', - ) - streams.push(stream) - await this.handleRateLimitError( - logger, - intService, - req, - startOfProcessing, - err.rateLimitResetSeconds, - stepContext, - failedStreams, - streams, - ) - stillProcessing = true - return - } - throw err + processStreamResult = await intService.processStream(stream, stepContext) + } catch (err) { + if (err.rateLimitResetSeconds) { + logger.warn( + { streamId: stream.id, message: err.message }, + 'Rate limit reached while processing stream! Delaying...', + ) + streams.push(stream) + await this.handleRateLimitError( + logger, + run, + err.rateLimitResetSeconds, + stepContext, + stream, + ) + return } - if (processStreamResult.newStreams && processStreamResult.newStreams.length > 0) { - streams.push(...processStreamResult.newStreams) + const retries = await this.integrationStreamRepository.markError(stream.id, { + errorPoint: 'process_stream', + message: err.message, + stack: err.stack, + errorString: JSON.stringify(err), + }) + + logger.error(err, { retries, streamId: stream.id }, 'Error while processing stream!') - logger.info( - `Detected ${processStreamResult.newStreams.length} new streams to process! Now we have ${streams.length} streams to process.`, + failedStreams.push({ + ...stream, + retries, + }) + } + + if (processStreamResult) { + // surround with try catch so if one stream fails we try all of them as well just in case + try { + logger.trace( + { stream: JSON.stringify(stream) }, + `Processing stream results! Still have ${streams.length} streams left to process!`, ) - } - for (const operation of processStreamResult.operations) { - if (operation.records.length > 0) { - logger.trace( - { operationType: operation.type }, - `Processing bulk operation with ${operation.records.length} records!`, + if (processStreamResult.newStreams && processStreamResult.newStreams.length > 0) { + const dbCreateStreams: DbIntegrationStreamCreateData[] = + processStreamResult.newStreams.map((s) => ({ + runId: req.runId, + tenantId: run.tenantId, + integrationId: run.integrationId, + microserviceId: run.microserviceId, + name: s.value, + metadata: s.metadata, + })) + + const results = await this.integrationStreamRepository.bulkCreate(dbCreateStreams) + + const newStreams: IIntegrationStream[] = results.map((r) => ({ + id: r.id, + value: r.name, + metadata: r.metadata, + })) + + streams.push(...newStreams) + + logger.info( + `Detected ${processStreamResult.newStreams.length} new streams to process! Now we have ${streams.length} streams to process.`, ) - stepContext.limitCount += operation.records.length - await bulkOperations(integration.tenantId, operation.type, operation.records) } - } - if (processStreamResult.nextPageStream !== undefined) { - if ( - !req.onboarding && - (await intService.isProcessingFinished( - stepContext, - stream, - processStreamResult.operations, - processStreamResult.lastRecordTimestamp, - )) - ) { - logger.warn('Integration processing finished because of service implementation!') - } else { - logger.trace( - { currentStream: JSON.stringify(stream) }, - `Detected next page stream! Now we have ${streams.length} left to process!`, - ) - streams.push(processStreamResult.nextPageStream) + for (const operation of processStreamResult.operations) { + if (operation.records.length > 0) { + logger.trace( + { operationType: operation.type }, + `Processing bulk operation with ${operation.records.length} records!`, + ) + stepContext.limitCount += operation.records.length + await bulkOperations(integration.tenantId, operation.type, operation.records) + } } - } - if (processStreamResult.sleep !== undefined && processStreamResult.sleep > 0) { - logger.warn( - `Stream processing resulted in a requested delay of ${processStreamResult.sleep}! Will delay ${streams.length} streams!`, - ) - - delay = processStreamResult.sleep - break - } + if (processStreamResult.nextPageStream !== undefined) { + if ( + !run.onboarding && + (await intService.isProcessingFinished( + stepContext, + stream, + processStreamResult.operations, + processStreamResult.lastRecordTimestamp, + )) + ) { + logger.warn('Integration processing finished because of service implementation!') + } else { + logger.trace( + { currentStream: JSON.stringify(stream) }, + `Detected next page stream! Now we have ${streams.length} left to process!`, + ) + const result = await this.integrationStreamRepository.create({ + runId: req.runId, + tenantId: run.tenantId, + integrationId: run.integrationId, + microserviceId: run.microserviceId, + name: processStreamResult.nextPageStream.value, + metadata: processStreamResult.nextPageStream.metadata, + }) + + streams.push({ + id: result.id, + value: result.name, + metadata: result.metadata, + }) + } + } - if (intService.globalLimit > 0 && stepContext.limitCount >= intService.globalLimit) { - // if limit reset frequency is 0 we don't need to care about limits - if (intService.limitResetFrequencySeconds > 0) { + if (processStreamResult.sleep !== undefined && processStreamResult.sleep > 0) { logger.warn( - { - limitCount: stepContext.limitCount, - globalLimit: intService.globalLimit, - streamsLeft: streams.length, - }, - 'We reached a global limit - stopping processing!', + `Stream processing resulted in a requested delay of ${processStreamResult.sleep}! Will delay ${streams.length} streams!`, ) - integration.limitCount = stepContext.limitCount + const delayUntil = moment().add(processStreamResult.sleep, 'seconds').toDate() + await this.integrationRunRepository.delay(req.runId, delayUntil) + break + } - const secondsSinceLastReset = moment() - .utc() - .diff(moment(integration.limitLastResetAt).utc(), 'seconds') + if (intService.globalLimit > 0 && stepContext.limitCount >= intService.globalLimit) { + // if limit reset frequency is 0 we don't need to care about limits + if (intService.limitResetFrequencySeconds > 0) { + logger.warn( + { + limitCount: stepContext.limitCount, + globalLimit: intService.globalLimit, + streamsLeft: streams.length, + }, + 'We reached a global limit - stopping processing!', + ) + + integration.limitCount = stepContext.limitCount + + const secondsSinceLastReset = moment() + .utc() + .diff(moment(integration.limitLastResetAt).utc(), 'seconds') + + if (secondsSinceLastReset < intService.limitResetFrequencySeconds) { + const delayUntil = moment() + .add(intService.limitResetFrequencySeconds - secondsSinceLastReset, 'seconds') + .toDate() + await this.integrationRunRepository.delay(req.runId, delayUntil) + } - if (secondsSinceLastReset < intService.limitResetFrequencySeconds) { - delay = intService.limitResetFrequencySeconds - secondsSinceLastReset + break } + } - break + if (notifyCount === 50 || streams.length === 0) { + logger.info( + `Processed ${processedCount} streams! Still have ${streams.length} to process.`, + ) + notifyCount = 0 } - } - if (notifyCount === 50 || streams.length === 0) { - logger.info( - `Processed ${processedCount} streams! Still have ${streams.length} to process.`, + await this.integrationStreamRepository.markProcessed(stream.id) + } catch (err) { + logger.error( + err, + { stream: JSON.stringify(stream) }, + 'Error processing stream results!', ) - notifyCount = 0 + const retries = await this.integrationStreamRepository.markError(stream.id, { + errorPoint: 'process_stream_results', + message: err.message, + stack: err.stack, + errorString: JSON.stringify(err), + }) + failedStreams.push({ + ...stream, + retries, + }) } - } catch (err) { - logger.error(err, { stream: JSON.stringify(stream) }, 'Error processing a stream!') - failedStreams.push(stream) } } // postprocess integration settings await intService.postprocess(stepContext, failedStreams, streams) - if (!exit && (streams.length > 0 || failedStreams.length > 0)) { - logger.warn( - { failed: failedStreams.length, remaining: streams.length }, - 'Integration processing finished - some streams were not processed!', - ) - - const existingRetryStreams = req.retryStreams || [] - - const retryStreams: IIntegrationStreamRetry[] = [] - let streamRetryLimitReached = false - for (const failedStream of failedStreams) { - let retryCount = 1 - let id = uuid() - if (failedStream.id) { - for (const existingRetryStream of existingRetryStreams) { - if (failedStream.id === existingRetryStream.id) { - retryCount = existingRetryStream.retryCount + 1 - id = existingRetryStream.id - break - } - } - } - - if (retryCount > MAX_STREAM_RETRIES) { - logger.warn( - { failedStream: JSON.stringify(failedStream) }, - 'Failed stream will not be retried because it reached retry limit!', - ) - streamRetryLimitReached = true - } else { - retryStreams.push({ - id, - retryCount, - stream: failedStream, - }) - - if (delay < retryCount * 5) { - delay = retryCount * 5 - } - } - } - - if (streams.length > 0 || retryStreams.length > 0) { - await sendNodeWorkerMessage( - req.tenantId, - new NodeWorkerIntegrationProcessMessage( - req.integrationType, - req.tenantId, - req.onboarding, - req.integrationId, - req.microserviceId, - req.metadata, - retryStreams, - streams, - ), - delay, - ) - } else if (streamRetryLimitReached && req.onboarding) { - setError = true - } - } logger.info('Done processing integration!') } else { logger.warn('No streams detected!') } } catch (err) { logger.error(err, 'Error while processing integration!') - setError = req.onboarding } finally { - if (!stillProcessing) { - let emailSentAt - if (!setError && !integration.emailSentAt) { + const newState = await this.integrationRunRepository.touchState(req.runId) + + let emailSentAt + if (newState === IntegrationRunState.PROCESSED) { + if (!integration.emailSentAt) { const tenantUsers = await UserRepository.findAllUsersOfTenant(integration.tenantId) emailSentAt = new Date() for (const user of tenantUsers) { @@ -681,159 +747,84 @@ export class IntegrationProcessor extends LoggingBase { }).sendTo(user.email) } } + } - await this.redisCache.delete(integration.id) - - await IntegrationRepository.update( - integration.id, - { - status: setError ? 'error' : 'done', - emailSentAt, - settings: stepContext.integration.settings, - refreshToken: stepContext.integration.refreshToken, - token: stepContext.integration.token, - }, - userContext, - ) + let status + switch (newState) { + case IntegrationRunState.PROCESSED: + status = 'done' + break + case IntegrationRunState.ERROR: + status = 'error' + break + default: + status = integration.status + } - if (setError) { - await sendSlackAlert(SlackAlertTypes.INTEGRATION_ERROR, integration, userContext, logger) - } + await IntegrationRepository.update( + integration.id, + { + status, + emailSentAt, + settings: stepContext.integration.settings, + refreshToken: stepContext.integration.refreshToken, + token: stepContext.integration.token, + }, + userContext, + ) - if (req.onboarding && this.apiPubSubEmitter) { - this.apiPubSubEmitter.emit( - 'user', - new ApiWebsocketMessage( - 'integration-completed', - JSON.stringify({ - integrationId: integration.id, - status: setError ? 'error' : 'done', - }), - undefined, - integration.tenantId, - ), - ) + if (newState === IntegrationRunState.PROCESSING) { + if (failedStreams.length > 0) { + logger.warn('Integration ended but we are still processing - delaying for a minute!') + const delayUntil = moment().add(60, 'seconds') + await this.integrationRunRepository.delay(run.id, delayUntil.toDate()) + } else { + logger.error('Integration ended but we are still processing!') } + } else if (newState === IntegrationRunState.ERROR) { + await sendSlackAlert(SlackAlertTypes.INTEGRATION_ERROR, integration, userContext, logger) + } + + if (run.onboarding && this.apiPubSubEmitter) { + this.apiPubSubEmitter.emit( + 'user', + new ApiWebsocketMessage( + 'integration-completed', + JSON.stringify({ + integrationId: integration.id, + status, + }), + undefined, + integration.tenantId, + ), + ) } } } private async handleRateLimitError( logger: Logger, - intService: IntegrationServiceBase, - req: NodeWorkerIntegrationProcessMessage, - startOfProcessing: moment.Moment, + run: IntegrationRun, rateLimitResetSeconds: number, - context?: IStepContext, - failedStreams?: IIntegrationStream[], - remainingStreams?: IIntegrationStream[], + context: IStepContext, + stream?: IIntegrationStream, ): Promise { - if (req.onboarding) { - const totalProcessingSeconds = - moment().diff(startOfProcessing, 'seconds') + - rateLimitResetSeconds + - (req.totalDuration || 0) - - // if we are processing for more than 7 days, we should just stop - if (totalProcessingSeconds > 7 * 24 * 60 * 60) { - logger.error( - { totalProcessing: totalProcessingSeconds }, - 'We are processing this integration onboarding for more than 7 days - stopping!', - ) - await IntegrationRepository.update( - context.integration.id, - { - status: 'error', - settings: context.integration.settings, - refreshToken: context.integration.refreshToken, - token: context.integration.token, - }, - context.repoContext, - ) - return - } - - // mark it as in process so that we don't do checks on it while we wait... - await this.redisCache.setValue(req.integrationId, 'processing', rateLimitResetSeconds + 5) - - if (context) { - try { - await intService.postprocess(context, failedStreams, remainingStreams) - await IntegrationRepository.update( - context.integration.id, - { - settings: context.integration.settings, - refreshToken: context.integration.refreshToken, - token: context.integration.token, - }, - context.repoContext, - ) - } catch (err) { - logger.error(err, 'Error while postprocessing integration!') - await IntegrationRepository.update( - context.integration.id, - { - status: 'error', - settings: context.integration.settings, - refreshToken: context.integration.refreshToken, - token: context.integration.token, - }, - context.repoContext, - ) - return - } - } - - if (remainingStreams === undefined) { - logger.warn( - { totalProcessingSeconds }, - 'No remaining streams - delaying entire integration!', - ) - await sendNodeWorkerMessage( - req.tenantId, - new NodeWorkerIntegrationProcessMessage( - req.integrationType, - req.tenantId, - true, - req.integrationId, - req.microserviceId, - req.metadata, - undefined, - undefined, - totalProcessingSeconds, - ), - rateLimitResetSeconds + 30, - ) - } else { - logger.warn({ totalProcessingSeconds }, 'Delaying integration processing!') - const streams: IIntegrationStream[] = [] - if (failedStreams !== undefined && failedStreams.length > 0) { - logger.warn( - 'We have some failed streams - we will mark them as normal streams for delayed processing!', - ) - streams.push(...failedStreams) - } + await IntegrationRepository.update( + context.integration.id, + { + settings: context.integration.settings, + refreshToken: context.integration.refreshToken, + token: context.integration.token, + }, + context.repoContext, + ) - streams.push(...remainingStreams) + logger.warn('Rate limit reached, delaying integration processing!') + const delayUntil = moment().add(rateLimitResetSeconds + 30, 'seconds') + await this.integrationRunRepository.delay(run.id, delayUntil.toDate()) - await sendNodeWorkerMessage( - req.tenantId, - new NodeWorkerIntegrationProcessMessage( - req.integrationType, - req.tenantId, - true, - req.integrationId, - req.microserviceId, - req.metadata, - [], - streams, - totalProcessingSeconds, - ), - rateLimitResetSeconds + 5, - ) - } - } else { - logger.warn('Not onboarding - skipping delay because of rate limit!') + if (stream) { + await this.integrationStreamRepository.reset(stream.id) } } } diff --git a/backend/src/serverless/integrations/services/integrationServiceBase.ts b/backend/src/serverless/integrations/services/integrationServiceBase.ts index 06232f65fd..6f80d7a622 100644 --- a/backend/src/serverless/integrations/services/integrationServiceBase.ts +++ b/backend/src/serverless/integrations/services/integrationServiceBase.ts @@ -1,9 +1,11 @@ import { SuperfaceClient } from '@superfaceai/one-sdk' import moment from 'moment' import crypto from 'crypto' +import { IRepositoryOptions } from '../../../database/repositories/IRepositoryOptions' import { createServiceChildLogger } from '../../../utils/logging' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IProcessWebhookResults, IStepContext, @@ -13,6 +15,8 @@ import { IntegrationType } from '../../../types/integrationEnums' import { IS_TEST_ENV } from '../../../config' import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' +import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' +import { IntegrationRunState } from '../../../types/integrationRunTypes' const logger = createServiceChildLogger('integrationService') @@ -51,17 +55,24 @@ export abstract class IntegrationServiceBase { this.limitResetFrequencySeconds = 0 } - async triggerIntegrationCheck(integrations: any[]): Promise { + async triggerIntegrationCheck(integrations: any[], options: IRepositoryOptions): Promise { + const repository = new IntegrationRunRepository(options) + for (const integration of integrations) { - logger.info({ integrationId: integration.id }, 'Triggering integration processing!') + const run = await repository.create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: false, + state: IntegrationRunState.PENDING, + }) + + logger.info( + { integrationId: integration.id, runId: run.id }, + 'Triggering integration processing!', + ) await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - this.type, - integration.tenantId, - false, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) } } @@ -74,7 +85,7 @@ export abstract class IntegrationServiceBase { // do nothing - override if something is needed } - abstract getStreams(context: IStepContext): Promise + abstract getStreams(context: IStepContext): Promise abstract processStream( stream: IIntegrationStream, diff --git a/backend/src/serverless/integrations/services/integrations/discordIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/discordIntegrationService.ts index 81f48c233c..2f88dd4a49 100644 --- a/backend/src/serverless/integrations/services/integrations/discordIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/discordIntegrationService.ts @@ -14,6 +14,7 @@ import { MemberAttributeName } from '../../../../database/attributes/member/enum import MemberAttributeSettingsService from '../../../../services/memberAttributeSettingsService' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IProcessWebhookResults, IStepContext, @@ -37,11 +38,17 @@ import { getMessage } from '../../usecases/discord/getMessage' import { createRedisClient } from '../../../../utils/redis' import { RedisCache } from '../../../../utils/redis/redisCache' import { getChannel } from '../../usecases/discord/getChannel' +import { IRepositoryOptions } from '../../../../database/repositories/IRepositoryOptions' +import IntegrationRunRepository from '../../../../database/repositories/integrationRunRepository' +import { IntegrationRunState } from '../../../../types/integrationRunTypes' +import { createServiceChildLogger } from '../../../../utils/logging' /* eslint class-methods-use-this: 0 */ /* eslint-disable @typescript-eslint/no-unused-vars */ +const logger = createServiceChildLogger('discordIntegrationService') + export class DiscordIntegrationService extends IntegrationServiceBase { static readonly ENDPOINT_MAX_RETRY = 5 @@ -64,20 +71,32 @@ export class DiscordIntegrationService extends IntegrationServiceBase { return DiscordIntegrationService.token } - override async triggerIntegrationCheck(integrations: any[]): Promise { + override async triggerIntegrationCheck( + integrations: any[], + options: IRepositoryOptions, + ): Promise { + const repository = new IntegrationRunRepository(options) + let initialDelaySeconds = 0 const batches = lodash.chunk(integrations, 2) for (const batch of batches) { for (const integration of batch) { + const run = await repository.create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: false, + state: IntegrationRunState.PENDING, + }) + + logger.info( + { integrationId: integration.id, runId: run.id }, + 'Triggering discord integration processing!', + ) + await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - this.type, - integration.tenantId, - false, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), initialDelaySeconds, ) } @@ -198,8 +217,8 @@ export class DiscordIntegrationService extends IntegrationServiceBase { } } - async getStreams(context: IStepContext): Promise { - const predefined: IIntegrationStream[] = [ + async getStreams(context: IStepContext): Promise { + const predefined: IPendingStream[] = [ { value: 'members', metadata: { @@ -438,7 +457,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase { context: IStepContext, records: DiscordApiMessage[], ): Promise { - const newStreams: IIntegrationStream[] = [] + const newStreams: IPendingStream[] = [] const activities: AddActivitiesSingle[] = [] for (const record of records) { diff --git a/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts index db5119f9ef..867812d972 100644 --- a/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts @@ -5,6 +5,7 @@ import { Repo, Repos } from '../../types/regularTypes' import { IntegrationType, PlatformType } from '../../../../types/integrationEnums' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IProcessWebhookResults, IStepContext, @@ -124,7 +125,7 @@ export class GithubIntegrationService extends IntegrationServiceBase { } } - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { return context.pipelineData.repos.reduce((acc, repo) => { for (const endpoint of [ GithubStreamType.STARGAZERS, @@ -164,7 +165,7 @@ export class GithubIntegrationService extends IntegrationServiceBase { } let result - let newStreams: IIntegrationStream[] + let newStreams: IPendingStream[] switch (event) { case GithubStreamType.STARGAZERS: diff --git a/backend/src/serverless/integrations/services/integrations/hackerNewsIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/hackerNewsIntegrationService.ts index 486683a1bf..8192b8f0b3 100644 --- a/backend/src/serverless/integrations/services/integrations/hackerNewsIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/hackerNewsIntegrationService.ts @@ -6,6 +6,7 @@ import MemberAttributeSettingsService from '../../../../services/memberAttribute import { HackerNewsActivityType } from '../../../../types/activityTypes' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IStepContext, } from '../../../../types/integration/stepResult' @@ -58,7 +59,7 @@ export class HackerNewsIntegrationService extends IntegrationServiceBase { } } - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { return context.pipelineData.posts.map((a: HackerNewsSearchResult) => ({ value: a.postId, metadata: { @@ -71,7 +72,7 @@ export class HackerNewsIntegrationService extends IntegrationServiceBase { stream: IIntegrationStream, context: IStepContext, ): Promise { - let newStreams: IIntegrationStream[] + let newStreams: IPendingStream[] const post: HackerNewsResponse = await getPost(stream.value, context.logger) diff --git a/backend/src/serverless/integrations/services/integrations/premium/linkedinIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/premium/linkedinIntegrationService.ts index 46ce4bfe35..714abfd7ab 100644 --- a/backend/src/serverless/integrations/services/integrations/premium/linkedinIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/premium/linkedinIntegrationService.ts @@ -7,6 +7,7 @@ import { LinkedInMemberAttributes } from '../../../../../database/attributes/mem import MemberAttributeSettingsService from '../../../../../services/memberAttributeSettingsService' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IStepContext, } from '../../../../../types/integration/stepResult' @@ -107,7 +108,7 @@ export class LinkedinIntegrationService extends IntegrationServiceBase { } } - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { const posts: ILinkedInOrganizationPost[] = context.pipelineData.posts const commentsStream = posts.map((p) => ({ @@ -317,7 +318,7 @@ export class LinkedinIntegrationService extends IntegrationServiceBase { ) const activities: AddActivitiesSingle[] = [] - const newStreams: IIntegrationStream[] = [] + const newStreams: IPendingStream[] = [] for (const comment of comments) { const member = await this.parseMember(comment.authorUrn, context) @@ -413,7 +414,7 @@ export class LinkedinIntegrationService extends IntegrationServiceBase { const user = await getMember(context.pipelineData.nangoId, userId, context.logger) return JSON.stringify(user) }, - 24 * 60 * 60, + 2 * 24 * 60 * 60, ) const user = JSON.parse(userString) @@ -450,7 +451,7 @@ export class LinkedinIntegrationService extends IntegrationServiceBase { ) return JSON.stringify(organization) }, - 24 * 60 * 60, + 2 * 24 * 60 * 60, ) const organization = JSON.parse(organizationString) diff --git a/backend/src/serverless/integrations/services/integrations/redditIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/redditIntegrationService.ts index 703c591fe7..88b7114163 100644 --- a/backend/src/serverless/integrations/services/integrations/redditIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/redditIntegrationService.ts @@ -6,6 +6,7 @@ import MemberAttributeSettingsService from '../../../../services/memberAttribute import { RedditActivityType } from '../../../../types/activityTypes' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IStepContext, } from '../../../../types/integration/stepResult' @@ -62,7 +63,7 @@ export class RedditIntegrationService extends IntegrationServiceBase { * @param context context passed along worker messages * @returns an array of streams to process */ - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { return context.pipelineData.subreddits.map((subreddit: string) => ({ value: `subreddit:${subreddit}`, metadata: { @@ -133,7 +134,7 @@ export class RedditIntegrationService extends IntegrationServiceBase { const lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined // If we got results, we will want to check the next page - const nextPageStream: IIntegrationStream = + const nextPageStream: IPendingStream = posts.length > 0 ? { value: stream.value, metadata: { ...(stream.metadata || {}), after: nextPage } } : undefined diff --git a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts index 84a8e4c345..c76e38d498 100644 --- a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts @@ -3,6 +3,7 @@ import sanitizeHtml from 'sanitize-html' import { SLACK_CONFIG } from '../../../../config' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IStepContext, IStreamResultOperation, @@ -85,7 +86,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { await service.createPredefined(SlackMemberAttributes) } - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { const streams = [] if (context.onboarding) { @@ -114,7 +115,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { const operations: IStreamResultOperation[] = [] let nextPage: string - let newStreams: IIntegrationStream[] + let newStreams: IPendingStream[] let lastRecord switch (stream.value) { @@ -205,7 +206,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { throw new Error(`Unknown stream value ${stream.value}!`) } - const nextPageStream: IIntegrationStream = nextPage + const nextPageStream: IPendingStream = nextPage ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: nextPage } } : undefined @@ -234,7 +235,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { records: any[], stream: IIntegrationStream, context: IStepContext, - ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IIntegrationStream[] }> { + ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IPendingStream[] }> { switch (stream.value) { case 'members': { const members = await this.parseMembers(records, context) @@ -361,8 +362,8 @@ export class SlackIntegrationService extends IntegrationServiceBase { records: SlackMessages, stream: IIntegrationStream, context: IStepContext, - ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IIntegrationStream[] }> { - const newStreams: IIntegrationStream[] = [] + ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IPendingStream[] }> { + const newStreams: IPendingStream[] = [] const activities: AddActivitiesSingle[] = [] for (const record of records) { diff --git a/backend/src/serverless/integrations/services/integrations/stackOverflowIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/stackOverflowIntegrationService.ts index 671a56ee57..f258673c05 100644 --- a/backend/src/serverless/integrations/services/integrations/stackOverflowIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/stackOverflowIntegrationService.ts @@ -4,6 +4,7 @@ import { IStepContext, IIntegrationStream, IProcessStreamResults, + IPendingStream, } from '../../../../types/integration/stepResult' import { IntegrationType, PlatformType } from '../../../../types/integrationEnums' import { IntegrationServiceBase } from '../integrationServiceBase' @@ -64,7 +65,7 @@ export class StackOverlflowIntegrationService extends IntegrationServiceBase { * @param context context passed along worker messages * @returns an array of streams to process */ - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { const tagStreams = context.pipelineData.tags.map((tag: string) => ({ value: `questions_by_tag:${tag}`, metadata: { @@ -149,7 +150,7 @@ export class StackOverlflowIntegrationService extends IntegrationServiceBase { const lastRecord = activities.length > 0 ? activities[0] : undefined // If we got results, we will want to check the next page - const nextPageStream: IIntegrationStream = + const nextPageStream: IPendingStream = questions.length > 0 && hasMore ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: page + 1 } } : undefined @@ -219,7 +220,7 @@ export class StackOverlflowIntegrationService extends IntegrationServiceBase { const lastRecord = activities.length > 0 ? activities[0] : undefined // If we got results, we will want to check the next page - const nextPageStream: IIntegrationStream = + const nextPageStream: IPendingStream = answers.length > 0 && hasMore ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: page + 1 } } : undefined @@ -285,7 +286,7 @@ export class StackOverlflowIntegrationService extends IntegrationServiceBase { const lastRecord = activities.length > 0 ? activities[0] : undefined // If we got results, we will want to check the next page - const nextPageStream: IIntegrationStream = + const nextPageStream: IPendingStream = questions.length > 0 && hasMore ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: page + 1 } } : undefined diff --git a/backend/src/serverless/integrations/services/integrations/twitterIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/twitterIntegrationService.ts index 98a7eaa195..b56747ef8b 100644 --- a/backend/src/serverless/integrations/services/integrations/twitterIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/twitterIntegrationService.ts @@ -5,6 +5,7 @@ import { IntegrationType, PlatformType } from '../../../../types/integrationEnum import { TWITTER_CONFIG } from '../../../../config' import { IIntegrationStream, + IPendingStream, IProcessStreamResults, IStepContext, IStreamResultOperation, @@ -47,7 +48,7 @@ export class TwitterIntegrationService extends IntegrationServiceBase { await service.createPredefined(TwitterMemberAttributes) } - async getStreams(context: IStepContext): Promise { + async getStreams(context: IStepContext): Promise { const hashtags = context.integration.settings.hashtags return ['followers', 'mentions'] diff --git a/backend/src/serverless/integrations/types/discordTypes.ts b/backend/src/serverless/integrations/types/discordTypes.ts index 622a49682e..98f9fc2629 100644 --- a/backend/src/serverless/integrations/types/discordTypes.ts +++ b/backend/src/serverless/integrations/types/discordTypes.ts @@ -1,5 +1,5 @@ import { AddActivitiesSingle } from './messageTypes' -import { IIntegrationStream } from '../../../types/integration/stepResult' +import { IPendingStream } from '../../../types/integration/stepResult' export interface DiscordGetChannelsInput { guildId: string @@ -23,7 +23,7 @@ export interface DiscordGetMembersInput { export interface DiscordStreamProcessResult { activities: AddActivitiesSingle[] - newStreams: IIntegrationStream[] + newStreams: IPendingStream[] } export interface DiscordParsedReponse { diff --git a/backend/src/services/conversationService.ts b/backend/src/services/conversationService.ts index b50a1b33e1..48bf98d4b3 100644 --- a/backend/src/services/conversationService.ts +++ b/backend/src/services/conversationService.ts @@ -114,7 +114,6 @@ export default class ConversationService extends LoggingBase { filter: { platform }, }) ).rows[0] - await integrationService.update(integration.id, { settings: { ...integration.settings, inviteLink: data.inviteLinks[platform] }, }) diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index c8473e0d11..e987aa6d62 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -10,7 +10,7 @@ import SequelizeRepository from '../database/repositories/sequelizeRepository' import IntegrationRepository from '../database/repositories/integrationRepository' import Error542 from '../errors/Error542' import track from '../segment/track' -import { IntegrationType, PlatformType } from '../types/integrationEnums' +import { PlatformType } from '../types/integrationEnums' import { getInstalledRepositories } from '../serverless/integrations/usecases/github/rest/getInstalledRepositories' import { sendNodeWorkerMessage } from '../serverless/utils/nodeWorkerSQS' import { NodeWorkerIntegrationProcessMessage } from '../types/mq/nodeWorkerIntegrationProcessMessage' @@ -18,6 +18,8 @@ import telemetryTrack from '../segment/telemetryTrack' import getToken from '../serverless/integrations/usecases/nango/getToken' import { getOrganizations } from '../serverless/integrations/usecases/linkedin/getOrganizations' import Error404 from '../errors/Error404' +import IntegrationRunRepository from '../database/repositories/integrationRunRepository' +import { IntegrationRunState } from '../types/integrationRunTypes' const discordToken = DISCORD_CONFIG.token2 || DISCORD_CONFIG.token @@ -28,10 +30,13 @@ export default class IntegrationService { this.options = options } - async createOrUpdate(data) { + async createOrUpdate(data, transaction?: any) { try { - const record = await IntegrationRepository.findByPlatform(data.platform, { ...this.options }) - const updatedRecord = await this.update(record.id, data) + const record = await IntegrationRepository.findByPlatform(data.platform, { + ...this.options, + transaction, + }) + const updatedRecord = await this.update(record.id, data, transaction) if (!IS_TEST_ENV) { track( 'Integration Updated', @@ -46,7 +51,7 @@ export default class IntegrationService { return updatedRecord } catch (error) { if (error.code === 404) { - const record = await this.create(data) + const record = await this.create(data, transaction) if (!IS_TEST_ENV) { track( 'Integration Created', @@ -85,44 +90,32 @@ export default class IntegrationService { return IntegrationRepository.findByPlatform(platform, this.options) } - async create(data) { - const transaction = await SequelizeRepository.createTransaction(this.options) - + async create(data, transaction?: any) { try { const record = await IntegrationRepository.create(data, { ...this.options, transaction, }) - await SequelizeRepository.commitTransaction(transaction) return record } catch (error) { - await SequelizeRepository.rollbackTransaction(transaction) - SequelizeRepository.handleUniqueFieldError(error, this.options.language, 'integration') - throw error } } - async update(id, data) { - const transaction = await SequelizeRepository.createTransaction(this.options) - + async update(id, data, transaction?: any) { try { const record = await IntegrationRepository.update(id, data, { ...this.options, transaction, }) - await SequelizeRepository.commitTransaction(transaction) - return record - } catch (error) { - await SequelizeRepository.rollbackTransaction(transaction) - - SequelizeRepository.handleUniqueFieldError(error, this.options.language, 'integration') + } catch (err) { + SequelizeRepository.handleUniqueFieldError(err, this.options.language, 'integration') - throw error + throw err } } @@ -168,20 +161,32 @@ export default class IntegrationService { } async import(data, importHash) { - if (!importHash) { - throw new Error400(this.options.language, 'importer.errors.importHashRequired') - } + const transaction = await SequelizeRepository.createTransaction(this.options) - if (await this._isImportHashExistent(importHash)) { - throw new Error400(this.options.language, 'importer.errors.importHashExistent') - } + try { + if (!importHash) { + throw new Error400(this.options.language, 'importer.errors.importHashRequired') + } - const dataToCreate = { - ...data, - importHash, - } + if (await this._isImportHashExistent(importHash)) { + throw new Error400(this.options.language, 'importer.errors.importHashExistent') + } - return this.create(dataToCreate) + const dataToCreate = { + ...data, + importHash, + } + + const result = this.create(dataToCreate, transaction) + + await SequelizeRepository.commitTransaction(transaction) + + return await result + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + + throw err + } } async _isImportHashExistent(importHash) { @@ -231,68 +236,87 @@ export default class IntegrationService { * @returns integration object */ async connectGithub(code, installId, setupAction = 'install') { - if (setupAction === 'request') { - return this.createOrUpdate({ - platform: PlatformType.GITHUB, - status: 'waiting-approval', + const transaction = await SequelizeRepository.createTransaction(this.options) + + let integration + let run + try { + if (setupAction === 'request') { + return await this.createOrUpdate( + { + platform: PlatformType.GITHUB, + status: 'waiting-approval', + }, + transaction, + ) + } + + const GITHUB_AUTH_ACCESSTOKEN_URL = 'https://github.com/login/oauth/access_token' + // Getting the GitHub client ID and secret from the .env file. + const CLIENT_ID = GITHUB_CONFIG.clientId + const CLIENT_SECRET = GITHUB_CONFIG.clientSecret + // Post to GitHub to get token + const tokenResponse = await axios({ + method: 'post', + url: GITHUB_AUTH_ACCESSTOKEN_URL, + data: { + client_id: CLIENT_ID, + client_secret: CLIENT_SECRET, + code, + }, }) - } - const GITHUB_AUTH_ACCESSTOKEN_URL = 'https://github.com/login/oauth/access_token' - // Getting the GitHub client ID and secret from the .env file. - const CLIENT_ID = GITHUB_CONFIG.clientId - const CLIENT_SECRET = GITHUB_CONFIG.clientSecret - // Post to GitHub to get token - const tokenResponse = await axios({ - method: 'post', - url: GITHUB_AUTH_ACCESSTOKEN_URL, - data: { - client_id: CLIENT_ID, - client_secret: CLIENT_SECRET, - code, - }, - }) + // Doing some processing on the token + let token = tokenResponse.data + token = token.slice(token.search('=') + 1, token.search('&')) + + try { + const requestWithAuth = request.defaults({ + headers: { + authorization: `token ${token}`, + }, + }) + await requestWithAuth('GET /user') + } catch { + throw new Error542( + `Invalid token for GitHub integration. Code: ${code}, setupAction: ${setupAction}. Token: ${token}`, + ) + } - // Doing some processing on the token - let token = tokenResponse.data - token = token.slice(token.search('=') + 1, token.search('&')) + // Using try/catch since we want to return an error if the installation is not validated properly + // Fetch install token from GitHub, this will allow us to get the + // repos that the user gave us access to + const installToken = await IntegrationService.getInstallToken(installId) - try { - const requestWithAuth = request.defaults({ - headers: { - authorization: `token ${token}`, + const repos = await getInstalledRepositories(installToken) + + integration = await this.createOrUpdate( + { + platform: PlatformType.GITHUB, + token, + settings: { repos, updateMemberAttributes: true }, + integrationIdentifier: installId, + status: 'in-progress', }, - }) - await requestWithAuth('GET /user') - } catch { - throw new Error542( - `Invalid token for GitHub integration. Code: ${code}, setupAction: ${setupAction}. Token: ${token}`, + transaction, ) - } - // Using try/catch since we want to return an error if the installation is not validated properly - // Fetch install token from GitHub, this will allow us to get the - // repos that the user gave us access to - const installToken = await IntegrationService.getInstallToken(installId) - - const repos = await getInstalledRepositories(installToken) + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) - const integration = await this.createOrUpdate({ - platform: PlatformType.GITHUB, - token, - settings: { repos, updateMemberAttributes: true }, - integrationIdentifier: installId, - status: 'in-progress', - }) + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.GITHUB, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -304,22 +328,39 @@ export default class IntegrationService { * @returns integration object */ async discordConnect(guildId) { - const integration = await this.createOrUpdate({ - platform: PlatformType.DISCORD, - integrationIdentifier: guildId, - token: discordToken, - settings: { channels: [], updateMemberAttributes: true }, - status: 'in-progress', - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.DISCORD, + integrationIdentifier: guildId, + token: discordToken, + settings: { channels: [], updateMemberAttributes: true }, + status: 'in-progress', + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.DISCORD, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -351,23 +392,39 @@ export default class IntegrationService { } if (integration.status === 'pending-action') { - integration = await this.createOrUpdate({ - platform: PlatformType.LINKEDIN, - status: 'in-progress', - settings: integration.settings, - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.LINKEDIN, + status: 'in-progress', + settings: integration.settings, + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.LINKEDIN, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration } + this.options.log.error('LinkedIn integration is not in pending-action status!') throw new Error404(this.options.language, 'errors.linkedin.cantOnboardWrongStatus') } @@ -408,21 +465,38 @@ export default class IntegrationService { organizations[0].inUse = true } - const integration = await this.createOrUpdate({ - platform: PlatformType.LINKEDIN, - settings: { organizations, updateMemberAttributes: true }, - status, - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + let run + let integration + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.LINKEDIN, + settings: { organizations, updateMemberAttributes: true }, + status, + }, + transaction, + ) + + if (status === 'in-progress') { + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + } + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } - if (status === 'in-progress') { + if (run) { await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.LINKEDIN, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) } @@ -435,20 +509,37 @@ export default class IntegrationService { * @returns integration object */ async redditOnboard(subreddits) { - const integration = await this.createOrUpdate({ - platform: PlatformType.REDDIT, - settings: { subreddits, updateMemberAttributes: true }, - status: 'in-progress', - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.REDDIT, + settings: { subreddits, updateMemberAttributes: true }, + status: 'in-progress', + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.REDDIT, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -460,25 +551,40 @@ export default class IntegrationService { * @returns integration object */ async devtoConnectOrUpdate(integrationData) { - const integration = await this.createOrUpdate({ - platform: PlatformType.DEVTO, - settings: { - users: integrationData.users, - organizations: integrationData.organizations, - articles: [], - updateMemberAttributes: true, - }, - status: 'in-progress', - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.DEVTO, + settings: { + users: integrationData.users, + organizations: integrationData.organizations, + articles: [], + updateMemberAttributes: true, + }, + status: 'in-progress', + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.DEVTO, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -490,24 +596,39 @@ export default class IntegrationService { * @returns integration object */ async hackerNewsConnectOrUpdate(integrationData) { - const integration = await this.createOrUpdate({ - platform: PlatformType.HACKERNEWS, - settings: { - keywords: integrationData.keywords, - urls: integrationData.urls, - updateMemberAttributes: true, - }, - status: 'in-progress', - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.HACKERNEWS, + settings: { + keywords: integrationData.keywords, + urls: integrationData.urls, + updateMemberAttributes: true, + }, + status: 'in-progress', + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.HACKER_NEWS, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -522,22 +643,37 @@ export default class IntegrationService { integrationData.settings = integrationData.settings || {} integrationData.settings.updateMemberAttributes = true - const integration = await this.createOrUpdate({ - platform: PlatformType.SLACK, - ...integrationData, - status: 'in-progress', - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.SLACK, + ...integrationData, + status: 'in-progress', + }, + transaction, + ) - const isOnboarding: boolean = !('channels' in integration.settings) + const isOnboarding: boolean = !('channels' in integration.settings) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: isOnboarding, + state: IntegrationRunState.PENDING, + }) + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.SLACK, - integration.tenantId, - isOnboarding, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -552,29 +688,45 @@ export default class IntegrationService { const { profileId, token, refreshToken } = integrationData const hashtags = !integrationData.hashtags || integrationData.hashtags === '' ? [] : integrationData.hashtags - const integration = await this.createOrUpdate({ - platform: PlatformType.TWITTER, - integrationIdentifier: profileId, - token, - refreshToken, - limitCount: 0, - limitLastResetAt: moment().format('YYYY-MM-DD HH:mm:ss'), - status: 'in-progress', - settings: { - followers: [], - hashtags: typeof hashtags === 'string' ? hashtags.split(',') : hashtags, - updateMemberAttributes: true, - }, - }) + + const transaction = await SequelizeRepository.createTransaction(this.options) + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.TWITTER, + integrationIdentifier: profileId, + token, + refreshToken, + limitCount: 0, + limitLastResetAt: moment().format('YYYY-MM-DD HH:mm:ss'), + status: 'in-progress', + settings: { + followers: [], + hashtags: typeof hashtags === 'string' ? hashtags.split(',') : hashtags, + updateMemberAttributes: true, + }, + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.TWITTER, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration @@ -586,24 +738,39 @@ export default class IntegrationService { * @returns integration object */ async stackOverflowConnectOrUpdate(integrationData) { - const integration = await this.createOrUpdate({ - platform: PlatformType.STACKOVERFLOW, - settings: { - tags: integrationData.tags, - keywords: integrationData.keywords, - updateMemberAttributes: true, - }, - status: 'in-progress', - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + let integration + let run + + try { + integration = await this.createOrUpdate( + { + platform: PlatformType.STACKOVERFLOW, + settings: { + tags: integrationData.tags, + keywords: integrationData.keywords, + updateMemberAttributes: true, + }, + status: 'in-progress', + }, + transaction, + ) + + run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ + integrationId: integration.id, + tenantId: integration.tenantId, + onboarding: true, + state: IntegrationRunState.PENDING, + }) + await SequelizeRepository.commitTransaction(transaction) + } catch (err) { + await SequelizeRepository.rollbackTransaction(transaction) + throw err + } await sendNodeWorkerMessage( integration.tenantId, - new NodeWorkerIntegrationProcessMessage( - IntegrationType.STACKOVERFLOW, - integration.tenantId, - true, - integration.id, - ), + new NodeWorkerIntegrationProcessMessage(run.id), ) return integration diff --git a/backend/src/types/integration/stepResult.ts b/backend/src/types/integration/stepResult.ts index 7d8c247315..66d40ab896 100644 --- a/backend/src/types/integration/stepResult.ts +++ b/backend/src/types/integration/stepResult.ts @@ -2,10 +2,17 @@ import { IRepositoryOptions } from '../../database/repositories/IRepositoryOptio import { IServiceOptions } from '../../services/IServiceOptions' import { Logger } from '../../utils/logging' -export interface IIntegrationStream { +export interface IPendingStream { value: string metadata?: any - id?: string +} + +export interface IIntegrationStream extends IPendingStream { + id: string +} + +export interface IFailedIntegrationStream extends IIntegrationStream { + retries: number } export interface IStreamResultOperation { @@ -24,10 +31,10 @@ export interface IProcessStreamResults { lastRecordTimestamp?: number // if processing of the current stream results in new streams they should be returned here - newStreams?: IIntegrationStream[] + newStreams?: IPendingStream[] // if processing of the current stream results in the next page of the same stream it should be returned here - nextPageStream?: IIntegrationStream + nextPageStream?: IPendingStream // seconds to pause between continuing with integration processing for the remaining streams sleep?: number diff --git a/backend/src/types/integrationRunTypes.ts b/backend/src/types/integrationRunTypes.ts new file mode 100644 index 0000000000..c12f5addff --- /dev/null +++ b/backend/src/types/integrationRunTypes.ts @@ -0,0 +1,29 @@ +export enum IntegrationRunState { + DELAYED = 'delayed', + PENDING = 'pending', + PROCESSING = 'processing', + PROCESSED = 'processed', + ERROR = 'error', +} + +export interface IntegrationRun { + id: string + tenantId: string + integrationId: string | null + microserviceId: string | null + onboarding: boolean + state: IntegrationRunState + delayedUntil: string | null + processedAt: string | null + streamCount: number | null + error: any | null + createdAt: string | null +} + +export interface DbIntegrationRunCreateData { + tenantId: string + integrationId?: string + microserviceId?: string + onboarding: boolean + state: IntegrationRunState +} diff --git a/backend/src/types/integrationStreamTypes.ts b/backend/src/types/integrationStreamTypes.ts index 6ce9cf6935..848119de1b 100644 --- a/backend/src/types/integrationStreamTypes.ts +++ b/backend/src/types/integrationStreamTypes.ts @@ -1,5 +1,3 @@ -import { String } from 'aws-sdk/clients/cloudtrail' - export enum IntegrationStreamState { PENDING = 'pending', PROCESSING = 'processing', @@ -9,19 +7,24 @@ export enum IntegrationStreamState { export interface IntegrationStream { id: string - runId: String - integrationId: string + runId: string + tenantId: string + integrationId: string | null + microserviceId: string | null state: IntegrationStreamState name: string metadata: any processedAt: string | null error: any | null + retries: number | null createdAt: string } export interface DbIntegrationStreamCreateData { runId: string - integrationId: string + tenantId: string + integrationId?: string + microserviceId?: string name: string metadata: any } diff --git a/backend/src/types/mq/nodeWorkerIntegrationProcessMessage.ts b/backend/src/types/mq/nodeWorkerIntegrationProcessMessage.ts index c700855d03..0fa1abcae4 100644 --- a/backend/src/types/mq/nodeWorkerIntegrationProcessMessage.ts +++ b/backend/src/types/mq/nodeWorkerIntegrationProcessMessage.ts @@ -1,5 +1,4 @@ import { NodeWorkerMessageType } from '../../serverless/types/workerTypes' -import { IntegrationType } from '../integrationEnums' import { NodeWorkerMessageBase } from './nodeWorkerMessageBase' import { IIntegrationStream } from '../integration/stepResult' @@ -10,17 +9,7 @@ export interface IIntegrationStreamRetry { } export class NodeWorkerIntegrationProcessMessage extends NodeWorkerMessageBase { - constructor( - public readonly integrationType: IntegrationType, - public readonly tenantId: string, - public readonly onboarding: boolean, - public readonly integrationId?: string, - public readonly microserviceId?: string, - public readonly metadata?: any, - public readonly retryStreams?: IIntegrationStreamRetry[], - public readonly remainingStreams?: IIntegrationStream[], - public readonly totalDuration?: number, - ) { + constructor(public readonly runId: string, public readonly metadata?: any) { super(NodeWorkerMessageType.INTEGRATION_PROCESS) } } From 870d1688a4ae0090b17d48bad4df145f10e0c81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 28 Mar 2023 10:45:00 +0200 Subject: [PATCH 3/5] fixes --- .../V1679825091__stream-delaying.sql | 2 -- .../repositories/integrationRunRepository.ts | 20 +------------------ backend/src/types/integrationRunTypes.ts | 1 - 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/backend/src/database/migrations/V1679825091__stream-delaying.sql b/backend/src/database/migrations/V1679825091__stream-delaying.sql index 2b2e4576fc..8d494e1ae3 100644 --- a/backend/src/database/migrations/V1679825091__stream-delaying.sql +++ b/backend/src/database/migrations/V1679825091__stream-delaying.sql @@ -11,8 +11,6 @@ create table "integrationRuns" ( "processedAt" timestamptz null, error json null, - "processedStreamCount" int null, - "errorStreamCount" int null, "createdAt" timestamptz not null default now(), "updatedAt" timestamptz not null default now(), diff --git a/backend/src/database/repositories/integrationRunRepository.ts b/backend/src/database/repositories/integrationRunRepository.ts index 6bbee124e3..c41a60d3db 100644 --- a/backend/src/database/repositories/integrationRunRepository.ts +++ b/backend/src/database/repositories/integrationRunRepository.ts @@ -35,8 +35,6 @@ export default class IntegrationRunRepository extends RepositoryBase< "delayedUntil", "processedAt", error, - "processedStreamCount", - "errorStreamCount", "createdAt" from "integrationRuns" where state = :delayedState and "delayedUntil" <= now() @@ -95,8 +93,6 @@ export default class IntegrationRunRepository extends RepositoryBase< "delayedUntil", "processedAt", error, - "processedStreamCount", - "errorStreamCount", "createdAt" from "integrationRuns" where state in (:delayedState, :processingState, :pendingState) and ${condition} @@ -132,8 +128,6 @@ export default class IntegrationRunRepository extends RepositoryBase< "delayedUntil", "processedAt", error, - "processedStreamCount", - "errorStreamCount", "createdAt" from "integrationRuns" where id = :id; ` @@ -192,7 +186,6 @@ export default class IntegrationRunRepository extends RepositoryBase< state: data.state, delayedUntil: null, processedAt: null, - streamCount: null, error: null, createdAt: (result[0] as any).createdAt, } @@ -236,8 +229,6 @@ export default class IntegrationRunRepository extends RepositoryBase< set state = :state, "delayedUntil" = null, "processedAt" = null, - "processedStreamCount" = null, - "errorStreamCount" = null, error = null, "updatedAt" = now() where id = :id @@ -323,15 +314,7 @@ export default class IntegrationRunRepository extends RepositoryBase< const query = ` update "integrationRuns" - set "processedStreamCount" = (select count(s.id) - from "integrationStreams" s - where s."runId" = :id - and s.state = :successStreamState), - "errorStreamCount" = (select count(s.id) - from "integrationStreams" s - where s."runId" = :id - and s.state = :errorStreamState), - "processedAt" = case + set "processedAt" = case when (select count(s.id) = (count(s.id) filter ( where s.state = :successStreamState ) + count(s.id) filter (where s.state = :errorStreamState and s.retries >= :maxRetries)) @@ -363,7 +346,6 @@ export default class IntegrationRunRepository extends RepositoryBase< successStreamState: IntegrationStreamState.PROCESSED, errorStreamState: IntegrationStreamState.ERROR, successRunState: IntegrationRunState.PROCESSED, - processingRunState: IntegrationRunState.PROCESSING, errorRunState: IntegrationRunState.ERROR, maxRetries: 5, }, diff --git a/backend/src/types/integrationRunTypes.ts b/backend/src/types/integrationRunTypes.ts index c12f5addff..603e9a4ea2 100644 --- a/backend/src/types/integrationRunTypes.ts +++ b/backend/src/types/integrationRunTypes.ts @@ -15,7 +15,6 @@ export interface IntegrationRun { state: IntegrationRunState delayedUntil: string | null processedAt: string | null - streamCount: number | null error: any | null createdAt: string | null } From adf7758e87052e4cae167bf232eddd5854c7cb51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 28 Mar 2023 10:59:21 +0200 Subject: [PATCH 4/5] cleanup cron job --- .../src/bin/jobs/cleanUpIntegrationRuns.ts | 19 +++++++++++++++++++ backend/src/bin/jobs/index.ts | 2 ++ .../repositories/integrationRunRepository.ts | 12 ++++++++++++ 3 files changed, 33 insertions(+) create mode 100644 backend/src/bin/jobs/cleanUpIntegrationRuns.ts diff --git a/backend/src/bin/jobs/cleanUpIntegrationRuns.ts b/backend/src/bin/jobs/cleanUpIntegrationRuns.ts new file mode 100644 index 0000000000..7a32199670 --- /dev/null +++ b/backend/src/bin/jobs/cleanUpIntegrationRuns.ts @@ -0,0 +1,19 @@ +import { CrowdJob } from '../../types/jobTypes' +import SequelizeRepository from '../../database/repositories/sequelizeRepository' +import IntegrationRunRepository from '../../database/repositories/integrationRunRepository' + +const MAX_MONTHS_TO_KEEP = 3 + +const job: CrowdJob = { + name: 'Clean up old successful integration runs', + // run once every week on Sunday at 1AM + cronTime: '0 1 * * 0', + onTrigger: async () => { + const dbOptions = await SequelizeRepository.getDefaultIRepositoryOptions() + const repo = new IntegrationRunRepository(dbOptions) + + await repo.cleanupOldRuns(MAX_MONTHS_TO_KEEP) + }, +} + +export default job diff --git a/backend/src/bin/jobs/index.ts b/backend/src/bin/jobs/index.ts index 52ea31e0dc..be445a9e7c 100644 --- a/backend/src/bin/jobs/index.ts +++ b/backend/src/bin/jobs/index.ts @@ -8,6 +8,7 @@ import downgradeExpiredPlans from './downgradeExpiredPlans' import eagleEyeEmailDigestTicks from './eagleEyeEmailDigestTicks' import integrationDataChecker from './integrationDataChecker' import refreshSampleData from './refreshSampleData' +import cleanUpIntegrationRuns from './cleanUpIntegrationRuns' const jobs: CrowdJob[] = [ weeklyAnalyticsEmailsCoordinator, @@ -19,6 +20,7 @@ const jobs: CrowdJob[] = [ eagleEyeEmailDigestTicks, integrationDataChecker, refreshSampleData, + cleanUpIntegrationRuns, ] export default jobs diff --git a/backend/src/database/repositories/integrationRunRepository.ts b/backend/src/database/repositories/integrationRunRepository.ts index c41a60d3db..da84d4207c 100644 --- a/backend/src/database/repositories/integrationRunRepository.ts +++ b/backend/src/database/repositories/integrationRunRepository.ts @@ -359,4 +359,16 @@ export default class IntegrationRunRepository extends RepositoryBase< return (result[0] as any).state } + + async cleanupOldRuns(months: number): Promise { + const seq = this.seq + + const cleanQuery = ` + delete from "integrationRuns" where state = 'processed' and "processedAt" < now() - interval '${months} months'; + ` + + await seq.query(cleanQuery, { + type: QueryTypes.DELETE, + }) + } } From e29e06f1d9e8644d4e303df66587b5fde1747f59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 29 Mar 2023 14:51:29 +0200 Subject: [PATCH 5/5] Moved max retries to config --- backend/config/default.json | 3 +++ backend/src/config/configTypes.ts | 4 ++++ backend/src/config/index.ts | 4 ++++ backend/src/database/repositories/integrationRunRepository.ts | 3 ++- 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/backend/config/default.json b/backend/config/default.json index a7cf3a7654..ce72b734ff 100644 --- a/backend/config/default.json +++ b/backend/config/default.json @@ -3,6 +3,9 @@ "port": 8080, "documentation": false }, + "integrationProcessing": { + "maxRetries": 5 + }, "sqs": {}, "s3": {}, "db": { diff --git a/backend/src/config/configTypes.ts b/backend/src/config/configTypes.ts index 7562e04c8b..bcf682e17f 100644 --- a/backend/src/config/configTypes.ts +++ b/backend/src/config/configTypes.ts @@ -212,3 +212,7 @@ export interface SlackAlertingConfiguration { export interface SampleDataConfiguration { tenantId: string } + +export interface IntegrationProcessingConfiguration { + maxRetries: number +} diff --git a/backend/src/config/index.ts b/backend/src/config/index.ts index da3c3f2fba..3212dc6020 100644 --- a/backend/src/config/index.ts +++ b/backend/src/config/index.ts @@ -28,6 +28,7 @@ import { StackExchangeConfiguration, SlackAlertingConfiguration, SampleDataConfiguration, + IntegrationProcessingConfiguration, } from './configTypes' // TODO-kube @@ -248,3 +249,6 @@ export const SAMPLE_DATA_CONFIG: SampleDataConfiguration = KUBE_MODE : { tenantId: process.env.SAMPLE_DATA_TENANT_ID, } + +export const INTEGRATION_PROCESSING_CONFIG: IntegrationProcessingConfiguration = + config.get('integrationProcessing') diff --git a/backend/src/database/repositories/integrationRunRepository.ts b/backend/src/database/repositories/integrationRunRepository.ts index da84d4207c..2a619ad5b0 100644 --- a/backend/src/database/repositories/integrationRunRepository.ts +++ b/backend/src/database/repositories/integrationRunRepository.ts @@ -8,6 +8,7 @@ import { import { IntegrationStreamState } from '../../types/integrationStreamTypes' import { IRepositoryOptions } from './IRepositoryOptions' import { RepositoryBase } from './repositoryBase' +import { INTEGRATION_PROCESSING_CONFIG } from '../../config' export default class IntegrationRunRepository extends RepositoryBase< IntegrationRun, @@ -347,7 +348,7 @@ export default class IntegrationRunRepository extends RepositoryBase< errorStreamState: IntegrationStreamState.ERROR, successRunState: IntegrationRunState.PROCESSED, errorRunState: IntegrationRunState.ERROR, - maxRetries: 5, + maxRetries: INTEGRATION_PROCESSING_CONFIG.maxRetries, }, type: QueryTypes.SELECT, transaction,