Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"port": 8080,
"documentation": false
},
"integrationProcessing": {
"maxRetries": 5
},
"sqs": {},
"s3": {},
"db": {
Expand Down
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"format": "npx prettier --write .",
"tsc-check": "tsc --noEmit",
"script:process-integration": "SERVICE=script ts-node ./src/bin/scripts/process-integration.ts",
"script:continue-run": "SERVICE=script ts-node ./src/bin/scripts/continue-run.ts",
"script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts",
"script:process-webhook": "SERVICE=script ts-node ./src/bin/scripts/process-webhook.ts",
"script:send-weekly-analytics-email": "SERVICE=script ts-node ./src/bin/scripts/send-weekly-analytics-email.ts",
Expand Down
4 changes: 3 additions & 1 deletion backend/src/bin/discord-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,13 @@ setImmediate(async () => {
}

if (triggerCheck) {
const repoOptions = await SequelizeRepository.getDefaultIRepositoryOptions()

const integrations = await IntegrationRepository.findAllActive(PlatformType.DISCORD)
if (integrations.length > 0) {
log.warn(`Found ${integrations.length} integrations to trigger check for!`)
const service = new DiscordIntegrationService()
await service.triggerIntegrationCheck(integrations)
await service.triggerIntegrationCheck(integrations, repoOptions)
} else {
log.warn('Found no integrations to trigger check for!')
}
Expand Down
19 changes: 19 additions & 0 deletions backend/src/bin/jobs/cleanUpIntegrationRuns.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { CrowdJob } from '../../types/jobTypes'
import SequelizeRepository from '../../database/repositories/sequelizeRepository'
import IntegrationRunRepository from '../../database/repositories/integrationRunRepository'

const MAX_MONTHS_TO_KEEP = 3

const job: CrowdJob = {
name: 'Clean up old successful integration runs',
// run once every week on Sunday at 1AM
cronTime: '0 1 * * 0',
onTrigger: async () => {
const dbOptions = await SequelizeRepository.getDefaultIRepositoryOptions()
const repo = new IntegrationRunRepository(dbOptions)

await repo.cleanupOldRuns(MAX_MONTHS_TO_KEEP)
},
}

export default job
2 changes: 2 additions & 0 deletions backend/src/bin/jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import downgradeExpiredPlans from './downgradeExpiredPlans'
import eagleEyeEmailDigestTicks from './eagleEyeEmailDigestTicks'
import integrationDataChecker from './integrationDataChecker'
import refreshSampleData from './refreshSampleData'
import cleanUpIntegrationRuns from './cleanUpIntegrationRuns'

const jobs: CrowdJob[] = [
weeklyAnalyticsEmailsCoordinator,
Expand All @@ -19,6 +20,7 @@ const jobs: CrowdJob[] = [
eagleEyeEmailDigestTicks,
integrationDataChecker,
refreshSampleData,
cleanUpIntegrationRuns,
]

export default jobs
82 changes: 82 additions & 0 deletions backend/src/bin/scripts/continue-run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import commandLineArgs from 'command-line-args'
import commandLineUsage from 'command-line-usage'
import * as fs from 'fs'
import path from 'path'
import { createServiceLogger } from '../../utils/logging'
import SequelizeRepository from '../../database/repositories/sequelizeRepository'
import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS'
import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage'
import IntegrationRunRepository from '../../database/repositories/integrationRunRepository'
import { IntegrationRunState } from '../../types/integrationRunTypes'

const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8')

const log = createServiceLogger()

const options = [
{
name: 'run',
alias: 'r',
typeLabel: '{underline runId}',
type: String,
description:
'The unique ID of integration run that you would like to continue processing. Use comma delimiter when sending multiple integration runs.',
},
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Print this usage guide.',
},
]
const sections = [
{
content: banner,
raw: true,
},
{
header: 'Continue Processing Integration Run',
content: 'Trigger processing of integration run.',
},
{
header: 'Options',
optionList: options,
},
]

const usage = commandLineUsage(sections)
const parameters = commandLineArgs(options)

if (parameters.help && !parameters.run) {
console.log(usage)
} else {
setImmediate(async () => {
const options = await SequelizeRepository.getDefaultIRepositoryOptions()

const runRepo = new IntegrationRunRepository(options)

const runIds = parameters.run.split(',')
for (const runId of runIds) {
const run = await runRepo.findById(runId)

if (!run) {
log.error({ runId }, 'Integration run not found!')
process.exit(1)
} else {
await log.info({ runId }, 'Integration run found - triggering SQS message!')

if (run.state !== IntegrationRunState.PENDING) {
log.warn(
{ currentState: run.state },
`Setting integration state to ${IntegrationRunState.PENDING}!`,
)
await runRepo.restart(run.id)
}

await sendNodeWorkerMessage(run.tenantId, new NodeWorkerIntegrationProcessMessage(run.id))
}
}

process.exit(0)
})
}
51 changes: 37 additions & 14 deletions backend/src/bin/scripts/process-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import SequelizeRepository from '../../database/repositories/sequelizeRepository
import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS'
import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage'
import IntegrationRepository from '../../database/repositories/integrationRepository'
import IntegrationRunRepository from '../../database/repositories/integrationRunRepository'
import { IntegrationRunState } from '../../types/integrationRunTypes'

const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8')

Expand Down Expand Up @@ -65,19 +67,31 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) {
const onboarding = parameters.onboarding
const options = await SequelizeRepository.getDefaultIRepositoryOptions()

const runRepo = new IntegrationRunRepository(options)

if (parameters.platform) {
const integrations = await IntegrationRepository.findAllActive(parameters.platform)
for (const i of integrations) {
const integration = i as any
log.info({ integrationId: integration.id, onboarding }, 'Triggering SQS message!')

const existingRun = await runRepo.findLastProcessingRun(integration.id)

if (existingRun && existingRun.onboarding) {
log.error('Integration is already processing, skipping!')
return
}

const run = await runRepo.create({
integrationId: integration.id,
tenantId: integration.tenantId,
onboarding,
state: IntegrationRunState.PENDING,
})

await sendNodeWorkerMessage(
integration.tenantId,
new NodeWorkerIntegrationProcessMessage(
integration.platform,
integration.tenantId,
onboarding,
integration.id,
),
new NodeWorkerIntegrationProcessMessage(run.id),
)
}
} else {
Expand All @@ -92,15 +106,24 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) {
process.exit(1)
} else {
log.info({ integrationId, onboarding }, 'Integration found - triggering SQS message!')
await sendNodeWorkerMessage(
integration.tenantId,
new NodeWorkerIntegrationProcessMessage(
integration.platform,
integration.tenantId,

const existingRun = await runRepo.findLastProcessingRun(integration.id)

if (existingRun && existingRun.onboarding) {
log.error('Integration is already processing, skipping!')
} else {
const run = await runRepo.create({
integrationId: integration.id,
tenantId: integration.tenantId,
onboarding,
integration.id,
),
)
state: IntegrationRunState.PENDING,
})

await sendNodeWorkerMessage(
integration.tenantId,
new NodeWorkerIntegrationProcessMessage(run.id),
)
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions backend/src/config/configTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,7 @@ export interface SlackAlertingConfiguration {
export interface SampleDataConfiguration {
tenantId: string
}

export interface IntegrationProcessingConfiguration {
maxRetries: number
}
4 changes: 4 additions & 0 deletions backend/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
StackExchangeConfiguration,
SlackAlertingConfiguration,
SampleDataConfiguration,
IntegrationProcessingConfiguration,
} from './configTypes'

// TODO-kube
Expand Down Expand Up @@ -248,3 +249,6 @@ export const SAMPLE_DATA_CONFIG: SampleDataConfiguration = KUBE_MODE
: {
tenantId: process.env.SAMPLE_DATA_TENANT_ID,
}

export const INTEGRATION_PROCESSING_CONFIG: IntegrationProcessingConfiguration =
config.get<IntegrationProcessingConfiguration>('integrationProcessing')
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
drop table "integrationStreams";
drop table "integrationRuns";
48 changes: 48 additions & 0 deletions backend/src/database/migrations/V1679825091__stream-delaying.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
create table "integrationRuns" (
id uuid not null,
"tenantId" uuid not null,
"integrationId" uuid null,
"microserviceId" uuid null,

onboarding boolean not null,
state varchar(255) not null,

"delayedUntil" timestamptz null,

"processedAt" timestamptz null,
error json null,

"createdAt" timestamptz not null default now(),
"updatedAt" timestamptz not null default now(),

foreign key ("tenantId") references tenants (id) on delete cascade,
foreign key ("integrationId") references integrations (id) on delete cascade,
foreign key ("microserviceId") references microservices (id) on delete cascade,
primary key (id)
);

create table "integrationStreams" (
id uuid not null,
"runId" uuid not null,
"tenantId" uuid not null,
"integrationId" uuid null,
"microserviceId" uuid null,

state varchar(255) not null,

name text not null,
metadata json not null,

"processedAt" timestamptz null,
error json null,
retries int null,

"createdAt" timestamptz not null default now(),
"updatedAt" timestamptz not null default now(),

foreign key ("runId") references "integrationRuns" (id) on delete cascade,
foreign key ("tenantId") references tenants (id) on delete cascade,
foreign key ("integrationId") references integrations (id) on delete cascade,
foreign key ("microserviceId") references microservices (id) on delete cascade,
primary key (id)
);
Loading