Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 71 additions & 16 deletions backend/src/database/repositories/integrationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import {
fetchGlobalIntegrationsStatusCount,
fetchGlobalNotConnectedIntegrations,
fetchGlobalNotConnectedIntegrationsCount,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { SequelizeQueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
import { getReposGroupedByOrg } from '@crowd/data-access-layer/src/repositories'
import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
import { getReposGroupedByOrgForIntegrations } from '@crowd/data-access-layer/src/repositories'
import { IntegrationRunState, PlatformType } from '@crowd/types'

import SequelizeFilterUtils from '../utils/sequelizeFilterUtils'
Expand Down Expand Up @@ -172,7 +172,7 @@ class IntegrationRepository {
throw new Error404()
}

return this._populateRelations(record)
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
}

static async findActiveIntegrationByPlatform(platform: PlatformType) {
Expand All @@ -188,7 +188,7 @@ class IntegrationRepository {
throw new Error404()
}

return this._populateRelations(record)
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
}

/**
Expand All @@ -213,7 +213,7 @@ class IntegrationRepository {
throw new Error404()
}

return Promise.all(records.map((record) => this._populateRelations(record)))
return this._populateRelationsForRows(records, SequelizeRepository.getQueryExecutor(options))
}

static async findByStatus(
Expand Down Expand Up @@ -263,7 +263,7 @@ class IntegrationRepository {
throw new Error404()
}

return this._populateRelations(record)
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
}

static async findById(id, options: IRepositoryOptions) {
Expand All @@ -283,7 +283,7 @@ class IntegrationRepository {
throw new Error404()
}

return this._populateRelations(record)
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
}

static async count(filter, options: IRepositoryOptions) {
Expand Down Expand Up @@ -474,7 +474,7 @@ class IntegrationRepository {
transaction: SequelizeRepository.getTransaction(options),
})

rows = await this._populateRelationsForRows(rows)
rows = await this._populateRelationsForRows(rows, SequelizeRepository.getQueryExecutor(options))

// Some integrations (i.e GitHub, Discord, Discourse, Groupsio) receive new data via webhook post-onboarding.
// We track their last processedAt separately, and not using updatedAt.
Expand Down Expand Up @@ -573,26 +573,80 @@ class IntegrationRepository {
}))
}

static async _populateRelationsForRows(rows) {
static async _populateRelationsForRows(rows, qx: QueryExecutor) {
if (!rows) {
return rows
}

return Promise.all(rows.map((record) => this._populateRelations(record)))
const records = rows.map((record) => record.get({ plain: true }))

const nangoIntegrationIds = records
.filter((r) => r.platform === PlatformType.GITHUB_NANGO)
.map((r) => r.id)

const githubIntegrationIds = records
.filter(
(r) =>
(r.platform === PlatformType.GITHUB || r.platform === PlatformType.GITHUB_NANGO) &&
r.settings?.orgs?.length > 0,
)
.map((r) => r.id)

const [allNangoMappings, allReposByOrg] = await Promise.all([
getNangoMappingsForIntegrations(qx, nangoIntegrationIds),
getReposGroupedByOrgForIntegrations(qx, githubIntegrationIds),
])

return records.map((output) => {
if (output.platform === PlatformType.GITHUB_NANGO) {
const nangoMapping = allNangoMappings[output.id]
if (nangoMapping && Object.keys(nangoMapping).length > 0) {
output.settings = { ...output.settings, nangoMapping }
}
}

if (
(output.platform === PlatformType.GITHUB ||
output.platform === PlatformType.GITHUB_NANGO) &&
output.settings?.orgs?.length > 0
) {
const reposByOrg = allReposByOrg[output.id]

if (reposByOrg && Object.keys(reposByOrg).length > 0) {
output.settings = {
...output.settings,
orgs: output.settings.orgs.map((org) => ({
...org,
repos: (reposByOrg[org.name] || []).map((r) => ({
url: r.url,
name: r.name,
owner: r.owner,
forkedFrom: r.forkedFrom,
updatedAt: r.updatedAt,
})),
})),
}
}

delete output.settings.repos
delete output.settings.unavailableRepos
}

return output
})
}

static async _populateRelations(record) {
static async _populateRelations(record, qx: QueryExecutor) {
if (!record) {
return record
}

const output = record.get({ plain: true })

const qx = new SequelizeQueryExecutor(record.sequelize)

// For github-nango integrations, populate settings.nangoMapping from dedicated table
if (output.platform === PlatformType.GITHUB_NANGO) {
const nangoMapping = await getNangoMappingsForIntegration(qx, output.id)
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [output.id])
const nangoMapping = allNangoMappings[output.id] || {}
if (Object.keys(nangoMapping).length > 0) {
output.settings = { ...output.settings, nangoMapping }
}
Expand All @@ -603,7 +657,8 @@ class IntegrationRepository {
(output.platform === PlatformType.GITHUB || output.platform === PlatformType.GITHUB_NANGO) &&
output.settings?.orgs?.length > 0
) {
const reposByOrg = await getReposGroupedByOrg(qx, output.id)
const allReposByOrg = await getReposGroupedByOrgForIntegrations(qx, [output.id])
const reposByOrg = allReposByOrg[output.id] || {}

// Only overwrite orgs[].repos from the repositories table if there are rows.
// During the 'mapping' phase (legacy github connect), repos live in settings
Expand Down
7 changes: 5 additions & 2 deletions services/apps/cron_service/src/jobs/nangoMonitoring.job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
INangoIntegrationData,
fetchNangoCursorRowsForIntegration,
fetchNangoIntegrationData,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories'
Expand Down Expand Up @@ -73,7 +73,10 @@ const job: IJobDefinition = {
for (const int of allIntegrations) {
if (int.platform === PlatformType.GITHUB_NANGO) {
// Fetch nango mappings from the dedicated table
const nangoMapping = await getNangoMappingsForIntegration(pgpQx(dbConnection), int.id)
const allNangoMappings = await getNangoMappingsForIntegrations(pgpQx(dbConnection), [
int.id,
])
const nangoMapping = allNangoMappings[int.id] || {}

// Check which repos are connected to nango by comparing repositories table vs nango_mapping
const repoRows = await getReposForGithubIntegration(pgpQx(dbConnection), int.id)
Expand Down
5 changes: 3 additions & 2 deletions services/apps/cron_service/src/jobs/nangoTrigger.job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/da
import {
fetchNangoIntegrationDataForCheck,
fetchNangoLastCheckedAt,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import {
Expand Down Expand Up @@ -81,7 +81,8 @@ const job: IJobDefinition = {
{ owner: string; repoName: string; repositoryId: string | null }
> = {}
if (platform === NangoIntegration.GITHUB) {
nangoMapping = await getNangoMappingsForIntegration(qx, id)
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [id])
nangoMapping = allNangoMappings[id] || {}
if (Object.keys(nangoMapping).length === 0) {
// ignore non-nango github integrations
continue
Expand Down
7 changes: 4 additions & 3 deletions services/apps/nango_worker/src/activities/nangoActivities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
fetchIntegrationById,
findIntegrationDataForNangoWebhookProcessing,
getNangoCursor,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
linkNangoMappingToRepository,
removeGithubNangoConnection,
removeNangoCursorsByConnection,
Expand Down Expand Up @@ -326,10 +326,11 @@ export async function analyzeGithubIntegration(
const finalRepos = Array.from(repoSet.values())

// fetch nango mappings from the dedicated table
const nangoMapping = await getNangoMappingsForIntegration(
const allNangoMappings = await getNangoMappingsForIntegrations(
dbStoreQx(svc.postgres.writer),
integrationId,
[integrationId],
)
const nangoMapping = allNangoMappings[integrationId] || {}
const connectionIds = Object.keys(nangoMapping)

// determine which connections to delete if needed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import {
fetchNangoIntegrationData,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'
Expand Down Expand Up @@ -40,8 +40,8 @@ setImmediate(async () => {
const connectionIds: string[] = []
for (const int of nangoIntegrations) {
if (int.platform === PlatformType.GITHUB_NANGO) {
const nangoMapping = await getNangoMappingsForIntegration(qx, int.id)
connectionIds.push(...Object.keys(nangoMapping))
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [int.id])
connectionIds.push(...Object.keys(allNangoMappings[int.id] || {}))
} else {
connectionIds.push(int.id)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import {
fetchNangoDeletedIntegrationData,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'
Expand Down Expand Up @@ -46,7 +46,8 @@ setImmediate(async () => {

for (const int of deletedNangoIntegrations) {
if (int.platform === PlatformType.GITHUB_NANGO) {
const nangoMapping = await getNangoMappingsForIntegration(qx, int.id)
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [int.id])
const nangoMapping = allNangoMappings[int.id] || {}
const connectionIdsForIntegration = Object.keys(nangoMapping)
if (connectionIdsForIntegration.length > 0) {
connectionIds.push(...connectionIdsForIntegration)
Expand Down
5 changes: 3 additions & 2 deletions services/apps/nango_worker/src/bin/check-nango-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/da
import {
fetchNangoCursorRowsForIntegration,
fetchNangoIntegrationData,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories'
Expand Down Expand Up @@ -43,7 +43,8 @@ async function collectStats(): Promise<Stats> {

for (const integration of integrations) {
// Fetch nango mappings from the dedicated table
const nangoMapping = await getNangoMappingsForIntegration(qx, integration.id)
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [integration.id])
const nangoMapping = allNangoMappings[integration.id] || {}
const connectionIds = Object.keys(nangoMapping)

// Track connectionIds that don't have cursors
Expand Down
9 changes: 4 additions & 5 deletions services/apps/nango_worker/src/bin/full-resync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/d
import {
clearNangoCursors,
findIntegrationDataForNangoWebhookProcessing,
getNangoMappingsForIntegration,
getNangoMappingsForIntegrations,
} from '@crowd/data-access-layer/src/integrations'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'
Expand Down Expand Up @@ -39,11 +39,10 @@ setImmediate(async () => {
try {
const toTrigger: string[] = []
if (integration.platform === PlatformType.GITHUB_NANGO) {
const nangoMapping = await getNangoMappingsForIntegration(
pgpQx(dbConnection),
const allNangoMappings = await getNangoMappingsForIntegrations(pgpQx(dbConnection), [
integration.id,
)
toTrigger.push(...Object.keys(nangoMapping))
])
toTrigger.push(...Object.keys(allNangoMappings[integration.id] || {}))
} else if (integration.platform === PlatformType.GERRIT) {
toTrigger.push(integration.id)
} else {
Expand Down
25 changes: 16 additions & 9 deletions services/libs/data-access-layer/src/integrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ export async function findNangoRepositoriesToBeRemoved(
return []
}

const nangoMappings = await getNangoMappingsForIntegration(qx, integrationId)
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [integrationId])
const nangoMappings = allNangoMappings[integrationId] || {}

if (Object.keys(nangoMappings).length === 0) {
return []
Expand Down Expand Up @@ -664,19 +665,25 @@ export interface INangoMappingRow {
updatedAt: string
}

export async function getNangoMappingsForIntegration(
export type NangoMappingEntry = { owner: string; repoName: string; repositoryId: string | null }

export async function getNangoMappingsForIntegrations(
qx: QueryExecutor,
integrationId: string,
): Promise<Record<string, { owner: string; repoName: string; repositoryId: string | null }>> {
integrationIds: string[],
): Promise<Record<string, Record<string, NangoMappingEntry>>> {
if (integrationIds.length === 0) return {}

const rows: INangoMappingRow[] = await qx.select(
`SELECT * FROM integration.nango_mapping WHERE "integrationId" = $(integrationId)`,
{ integrationId },
`SELECT * FROM integration.nango_mapping WHERE "integrationId" IN ($(integrationIds:csv))`,
{ integrationIds },
)

const result: Record<string, { owner: string; repoName: string; repositoryId: string | null }> =
{}
const result: Record<string, Record<string, NangoMappingEntry>> = {}
for (const row of rows) {
result[row.connectionId] = {
if (!result[row.integrationId]) {
result[row.integrationId] = {}
}
result[row.integrationId][row.connectionId] = {
owner: row.owner,
repoName: row.repoName,
repositoryId: row.repositoryId,
Expand Down
Loading