From 028c9530ff05b30f654196d197693beb4aeced2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 3 Mar 2026 14:41:34 +0100 Subject: [PATCH 1/2] fix: batch fetch repositories for integrations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../repositories/integrationRepository.ts | 83 ++++++++++++++++--- .../src/integrations/index.ts | 30 +++++++ .../src/repositories/index.ts | 47 +++++++++++ 3 files changed, 147 insertions(+), 13 deletions(-) diff --git a/backend/src/database/repositories/integrationRepository.ts b/backend/src/database/repositories/integrationRepository.ts index ae03b420ad..237c49d6ec 100644 --- a/backend/src/database/repositories/integrationRepository.ts +++ b/backend/src/database/repositories/integrationRepository.ts @@ -10,9 +10,13 @@ import { fetchGlobalNotConnectedIntegrations, fetchGlobalNotConnectedIntegrationsCount, getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' -import { SequelizeQueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' -import { getReposGroupedByOrg } from '@crowd/data-access-layer/src/repositories' +import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' +import { + getReposGroupedByOrg, + getReposGroupedByOrgForIntegrations, +} from '@crowd/data-access-layer/src/repositories' import { IntegrationRunState, PlatformType } from '@crowd/types' import SequelizeFilterUtils from '../utils/sequelizeFilterUtils' @@ -172,7 +176,7 @@ class IntegrationRepository { throw new Error404() } - return this._populateRelations(record) + return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options)) } static async findActiveIntegrationByPlatform(platform: PlatformType) { @@ -188,7 +192,7 @@ class IntegrationRepository { throw new Error404() } - return this._populateRelations(record) + return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options)) } /** @@ -213,7 +217,7 @@ class IntegrationRepository { throw new Error404() } - return Promise.all(records.map((record) => this._populateRelations(record))) + return this._populateRelationsForRows(records, SequelizeRepository.getQueryExecutor(options)) } static async findByStatus( @@ -263,7 +267,7 @@ class IntegrationRepository { throw new Error404() } - return this._populateRelations(record) + return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options)) } static async findById(id, options: IRepositoryOptions) { @@ -283,7 +287,7 @@ class IntegrationRepository { throw new Error404() } - return this._populateRelations(record) + return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options)) } static async count(filter, options: IRepositoryOptions) { @@ -474,7 +478,7 @@ class IntegrationRepository { transaction: SequelizeRepository.getTransaction(options), }) - rows = await this._populateRelationsForRows(rows) + rows = await this._populateRelationsForRows(rows, SequelizeRepository.getQueryExecutor(options)) // Some integrations (i.e GitHub, Discord, Discourse, Groupsio) receive new data via webhook post-onboarding. // We track their last processedAt separately, and not using updatedAt. @@ -573,23 +577,76 @@ class IntegrationRepository { })) } - static async _populateRelationsForRows(rows) { + static async _populateRelationsForRows(rows, qx: QueryExecutor) { if (!rows) { return rows } - return Promise.all(rows.map((record) => this._populateRelations(record))) + const records = rows.map((record) => record.get({ plain: true })) + + const nangoIntegrationIds = records + .filter((r) => r.platform === PlatformType.GITHUB_NANGO) + .map((r) => r.id) + + const githubIntegrationIds = records + .filter( + (r) => + (r.platform === PlatformType.GITHUB || r.platform === PlatformType.GITHUB_NANGO) && + r.settings?.orgs?.length > 0, + ) + .map((r) => r.id) + + const [allNangoMappings, allReposByOrg] = await Promise.all([ + getNangoMappingsForIntegrations(qx, nangoIntegrationIds), + getReposGroupedByOrgForIntegrations(qx, githubIntegrationIds), + ]) + + return records.map((output) => { + if (output.platform === PlatformType.GITHUB_NANGO) { + const nangoMapping = allNangoMappings[output.id] + if (nangoMapping && Object.keys(nangoMapping).length > 0) { + output.settings = { ...output.settings, nangoMapping } + } + } + + if ( + (output.platform === PlatformType.GITHUB || + output.platform === PlatformType.GITHUB_NANGO) && + output.settings?.orgs?.length > 0 + ) { + const reposByOrg = allReposByOrg[output.id] + + if (reposByOrg && Object.keys(reposByOrg).length > 0) { + output.settings = { + ...output.settings, + orgs: output.settings.orgs.map((org) => ({ + ...org, + repos: (reposByOrg[org.name] || []).map((r) => ({ + url: r.url, + name: r.name, + owner: r.owner, + forkedFrom: r.forkedFrom, + updatedAt: r.updatedAt, + })), + })), + } + } + + delete output.settings.repos + delete output.settings.unavailableRepos + } + + return output + }) } - static async _populateRelations(record) { + static async _populateRelations(record, qx: QueryExecutor) { if (!record) { return record } const output = record.get({ plain: true }) - const qx = new SequelizeQueryExecutor(record.sequelize) - // For github-nango integrations, populate settings.nangoMapping from dedicated table if (output.platform === PlatformType.GITHUB_NANGO) { const nangoMapping = await getNangoMappingsForIntegration(qx, output.id) diff --git a/services/libs/data-access-layer/src/integrations/index.ts b/services/libs/data-access-layer/src/integrations/index.ts index 3b0de9588c..4f53267eee 100644 --- a/services/libs/data-access-layer/src/integrations/index.ts +++ b/services/libs/data-access-layer/src/integrations/index.ts @@ -685,6 +685,36 @@ export async function getNangoMappingsForIntegration( return result } +export async function getNangoMappingsForIntegrations( + qx: QueryExecutor, + integrationIds: string[], +): Promise< + Record> +> { + if (integrationIds.length === 0) return {} + + const rows: INangoMappingRow[] = await qx.select( + `SELECT * FROM integration.nango_mapping WHERE "integrationId" IN ($(integrationIds:csv))`, + { integrationIds }, + ) + + const result: Record< + string, + Record + > = {} + for (const row of rows) { + if (!result[row.integrationId]) { + result[row.integrationId] = {} + } + result[row.integrationId][row.connectionId] = { + owner: row.owner, + repoName: row.repoName, + repositoryId: row.repositoryId, + } + } + return result +} + export async function getNangoMappingByConnectionId( qx: QueryExecutor, connectionId: string, diff --git a/services/libs/data-access-layer/src/repositories/index.ts b/services/libs/data-access-layer/src/repositories/index.ts index ec1b5b435e..0f0c2074d5 100644 --- a/services/libs/data-access-layer/src/repositories/index.ts +++ b/services/libs/data-access-layer/src/repositories/index.ts @@ -595,6 +595,53 @@ export async function getReposGroupedByOrg( return Object.fromEntries(groupBy(repos, (repo) => repo.owner)) } +export interface IGithubRepoForIntegrationWithSource extends IGithubRepoForIntegration { + sourceIntegrationId: string +} + +export async function getReposGroupedByOrgForIntegrations( + qx: QueryExecutor, + integrationIds: string[], +): Promise>> { + if (integrationIds.length === 0) return {} + + const repos: IGithubRepoForIntegrationWithSource[] = await qx.select( + ` + SELECT + r.url, + split_part(r.url, '/', -1) as name, + split_part(r.url, '/', -2) as owner, + r."forkedFrom", + r."updatedAt", + r."sourceIntegrationId" + FROM public.repositories r + WHERE r."sourceIntegrationId" IN ($(integrationIds:csv)) + AND r."deletedAt" IS NULL + ORDER BY r.url + `, + { integrationIds }, + ) + + const result: Record> = {} + for (const repo of repos) { + const intId = repo.sourceIntegrationId + if (!result[intId]) { + result[intId] = {} + } + if (!result[intId][repo.owner]) { + result[intId][repo.owner] = [] + } + result[intId][repo.owner].push({ + url: repo.url, + name: repo.name, + owner: repo.owner, + forkedFrom: repo.forkedFrom, + updatedAt: repo.updatedAt, + }) + } + return result +} + /** * Populates `settings.orgs[].repos` from the repositories table for github/github-nango integrations. * Used by worker services to inject repo data into settings before passing to stream processors. From 61689aa5309a41da199b2b96c938c1bbbf69694d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 3 Mar 2026 14:52:18 +0100 Subject: [PATCH 2/2] chore: small refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../repositories/integrationRepository.ts | 12 +++---- .../src/jobs/nangoMonitoring.job.ts | 7 ++-- .../cron_service/src/jobs/nangoTrigger.job.ts | 5 +-- .../src/activities/nangoActivities.ts | 7 ++-- .../src/bin/check-disconnected-connections.ts | 6 ++-- .../bin/check-disconnected-integrations.ts | 5 +-- .../src/bin/check-nango-mapping.ts | 5 +-- .../apps/nango_worker/src/bin/full-resync.ts | 9 +++-- .../src/integrations/index.ts | 33 +++---------------- .../src/repositories/index.ts | 14 ++------ 10 files changed, 38 insertions(+), 65 deletions(-) diff --git a/backend/src/database/repositories/integrationRepository.ts b/backend/src/database/repositories/integrationRepository.ts index 237c49d6ec..388e3b4503 100644 --- a/backend/src/database/repositories/integrationRepository.ts +++ b/backend/src/database/repositories/integrationRepository.ts @@ -9,14 +9,10 @@ import { fetchGlobalIntegrationsStatusCount, fetchGlobalNotConnectedIntegrations, fetchGlobalNotConnectedIntegrationsCount, - getNangoMappingsForIntegration, getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' -import { - getReposGroupedByOrg, - getReposGroupedByOrgForIntegrations, -} from '@crowd/data-access-layer/src/repositories' +import { getReposGroupedByOrgForIntegrations } from '@crowd/data-access-layer/src/repositories' import { IntegrationRunState, PlatformType } from '@crowd/types' import SequelizeFilterUtils from '../utils/sequelizeFilterUtils' @@ -649,7 +645,8 @@ class IntegrationRepository { // For github-nango integrations, populate settings.nangoMapping from dedicated table if (output.platform === PlatformType.GITHUB_NANGO) { - const nangoMapping = await getNangoMappingsForIntegration(qx, output.id) + const allNangoMappings = await getNangoMappingsForIntegrations(qx, [output.id]) + const nangoMapping = allNangoMappings[output.id] || {} if (Object.keys(nangoMapping).length > 0) { output.settings = { ...output.settings, nangoMapping } } @@ -660,7 +657,8 @@ class IntegrationRepository { (output.platform === PlatformType.GITHUB || output.platform === PlatformType.GITHUB_NANGO) && output.settings?.orgs?.length > 0 ) { - const reposByOrg = await getReposGroupedByOrg(qx, output.id) + const allReposByOrg = await getReposGroupedByOrgForIntegrations(qx, [output.id]) + const reposByOrg = allReposByOrg[output.id] || {} // Only overwrite orgs[].repos from the repositories table if there are rows. // During the 'mapping' phase (legacy github connect), repos live in settings diff --git a/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts b/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts index 28ab30aacd..19b5af5dc0 100644 --- a/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts +++ b/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts @@ -12,7 +12,7 @@ import { INangoIntegrationData, fetchNangoCursorRowsForIntegration, fetchNangoIntegrationData, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories' @@ -73,7 +73,10 @@ const job: IJobDefinition = { for (const int of allIntegrations) { if (int.platform === PlatformType.GITHUB_NANGO) { // Fetch nango mappings from the dedicated table - const nangoMapping = await getNangoMappingsForIntegration(pgpQx(dbConnection), int.id) + const allNangoMappings = await getNangoMappingsForIntegrations(pgpQx(dbConnection), [ + int.id, + ]) + const nangoMapping = allNangoMappings[int.id] || {} // Check which repos are connected to nango by comparing repositories table vs nango_mapping const repoRows = await getReposForGithubIntegration(pgpQx(dbConnection), int.id) diff --git a/services/apps/cron_service/src/jobs/nangoTrigger.job.ts b/services/apps/cron_service/src/jobs/nangoTrigger.job.ts index 9eac2a0652..d7fe9819e9 100644 --- a/services/apps/cron_service/src/jobs/nangoTrigger.job.ts +++ b/services/apps/cron_service/src/jobs/nangoTrigger.job.ts @@ -5,7 +5,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/da import { fetchNangoIntegrationDataForCheck, fetchNangoLastCheckedAt, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { @@ -81,7 +81,8 @@ const job: IJobDefinition = { { owner: string; repoName: string; repositoryId: string | null } > = {} if (platform === NangoIntegration.GITHUB) { - nangoMapping = await getNangoMappingsForIntegration(qx, id) + const allNangoMappings = await getNangoMappingsForIntegrations(qx, [id]) + nangoMapping = allNangoMappings[id] || {} if (Object.keys(nangoMapping).length === 0) { // ignore non-nango github integrations continue diff --git a/services/apps/nango_worker/src/activities/nangoActivities.ts b/services/apps/nango_worker/src/activities/nangoActivities.ts index 3687e18fbc..87d086d94c 100644 --- a/services/apps/nango_worker/src/activities/nangoActivities.ts +++ b/services/apps/nango_worker/src/activities/nangoActivities.ts @@ -7,7 +7,7 @@ import { fetchIntegrationById, findIntegrationDataForNangoWebhookProcessing, getNangoCursor, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, linkNangoMappingToRepository, removeGithubNangoConnection, removeNangoCursorsByConnection, @@ -326,10 +326,11 @@ export async function analyzeGithubIntegration( const finalRepos = Array.from(repoSet.values()) // fetch nango mappings from the dedicated table - const nangoMapping = await getNangoMappingsForIntegration( + const allNangoMappings = await getNangoMappingsForIntegrations( dbStoreQx(svc.postgres.writer), - integrationId, + [integrationId], ) + const nangoMapping = allNangoMappings[integrationId] || {} const connectionIds = Object.keys(nangoMapping) // determine which connections to delete if needed diff --git a/services/apps/nango_worker/src/bin/check-disconnected-connections.ts b/services/apps/nango_worker/src/bin/check-disconnected-connections.ts index 05ec10d147..8b51285ff9 100644 --- a/services/apps/nango_worker/src/bin/check-disconnected-connections.ts +++ b/services/apps/nango_worker/src/bin/check-disconnected-connections.ts @@ -1,7 +1,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' import { fetchNangoIntegrationData, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceLogger } from '@crowd/logging' @@ -40,8 +40,8 @@ setImmediate(async () => { const connectionIds: string[] = [] for (const int of nangoIntegrations) { if (int.platform === PlatformType.GITHUB_NANGO) { - const nangoMapping = await getNangoMappingsForIntegration(qx, int.id) - connectionIds.push(...Object.keys(nangoMapping)) + const allNangoMappings = await getNangoMappingsForIntegrations(qx, [int.id]) + connectionIds.push(...Object.keys(allNangoMappings[int.id] || {})) } else { connectionIds.push(int.id) } diff --git a/services/apps/nango_worker/src/bin/check-disconnected-integrations.ts b/services/apps/nango_worker/src/bin/check-disconnected-integrations.ts index 3d0fe4fc28..3620ded26b 100644 --- a/services/apps/nango_worker/src/bin/check-disconnected-integrations.ts +++ b/services/apps/nango_worker/src/bin/check-disconnected-integrations.ts @@ -1,7 +1,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' import { fetchNangoDeletedIntegrationData, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceLogger } from '@crowd/logging' @@ -46,7 +46,8 @@ setImmediate(async () => { for (const int of deletedNangoIntegrations) { if (int.platform === PlatformType.GITHUB_NANGO) { - const nangoMapping = await getNangoMappingsForIntegration(qx, int.id) + const allNangoMappings = await getNangoMappingsForIntegrations(qx, [int.id]) + const nangoMapping = allNangoMappings[int.id] || {} const connectionIdsForIntegration = Object.keys(nangoMapping) if (connectionIdsForIntegration.length > 0) { connectionIds.push(...connectionIdsForIntegration) diff --git a/services/apps/nango_worker/src/bin/check-nango-mapping.ts b/services/apps/nango_worker/src/bin/check-nango-mapping.ts index d60bec840d..02fec17654 100644 --- a/services/apps/nango_worker/src/bin/check-nango-mapping.ts +++ b/services/apps/nango_worker/src/bin/check-nango-mapping.ts @@ -2,7 +2,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/da import { fetchNangoCursorRowsForIntegration, fetchNangoIntegrationData, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories' @@ -43,7 +43,8 @@ async function collectStats(): Promise { for (const integration of integrations) { // Fetch nango mappings from the dedicated table - const nangoMapping = await getNangoMappingsForIntegration(qx, integration.id) + const allNangoMappings = await getNangoMappingsForIntegrations(qx, [integration.id]) + const nangoMapping = allNangoMappings[integration.id] || {} const connectionIds = Object.keys(nangoMapping) // Track connectionIds that don't have cursors diff --git a/services/apps/nango_worker/src/bin/full-resync.ts b/services/apps/nango_worker/src/bin/full-resync.ts index 0154dd37fd..58415e5bf7 100644 --- a/services/apps/nango_worker/src/bin/full-resync.ts +++ b/services/apps/nango_worker/src/bin/full-resync.ts @@ -2,7 +2,7 @@ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/d import { clearNangoCursors, findIntegrationDataForNangoWebhookProcessing, - getNangoMappingsForIntegration, + getNangoMappingsForIntegrations, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceLogger } from '@crowd/logging' @@ -39,11 +39,10 @@ setImmediate(async () => { try { const toTrigger: string[] = [] if (integration.platform === PlatformType.GITHUB_NANGO) { - const nangoMapping = await getNangoMappingsForIntegration( - pgpQx(dbConnection), + const allNangoMappings = await getNangoMappingsForIntegrations(pgpQx(dbConnection), [ integration.id, - ) - toTrigger.push(...Object.keys(nangoMapping)) + ]) + toTrigger.push(...Object.keys(allNangoMappings[integration.id] || {})) } else if (integration.platform === PlatformType.GERRIT) { toTrigger.push(integration.id) } else { diff --git a/services/libs/data-access-layer/src/integrations/index.ts b/services/libs/data-access-layer/src/integrations/index.ts index 4f53267eee..9d0d46caea 100644 --- a/services/libs/data-access-layer/src/integrations/index.ts +++ b/services/libs/data-access-layer/src/integrations/index.ts @@ -633,7 +633,8 @@ export async function findNangoRepositoriesToBeRemoved( return [] } - const nangoMappings = await getNangoMappingsForIntegration(qx, integrationId) + const allNangoMappings = await getNangoMappingsForIntegrations(qx, [integrationId]) + const nangoMappings = allNangoMappings[integrationId] || {} if (Object.keys(nangoMappings).length === 0) { return [] @@ -664,33 +665,12 @@ export interface INangoMappingRow { updatedAt: string } -export async function getNangoMappingsForIntegration( - qx: QueryExecutor, - integrationId: string, -): Promise> { - const rows: INangoMappingRow[] = await qx.select( - `SELECT * FROM integration.nango_mapping WHERE "integrationId" = $(integrationId)`, - { integrationId }, - ) - - const result: Record = - {} - for (const row of rows) { - result[row.connectionId] = { - owner: row.owner, - repoName: row.repoName, - repositoryId: row.repositoryId, - } - } - return result -} +export type NangoMappingEntry = { owner: string; repoName: string; repositoryId: string | null } export async function getNangoMappingsForIntegrations( qx: QueryExecutor, integrationIds: string[], -): Promise< - Record> -> { +): Promise>> { if (integrationIds.length === 0) return {} const rows: INangoMappingRow[] = await qx.select( @@ -698,10 +678,7 @@ export async function getNangoMappingsForIntegrations( { integrationIds }, ) - const result: Record< - string, - Record - > = {} + const result: Record> = {} for (const row of rows) { if (!result[row.integrationId]) { result[row.integrationId] = {} diff --git a/services/libs/data-access-layer/src/repositories/index.ts b/services/libs/data-access-layer/src/repositories/index.ts index 0f0c2074d5..d0960fe6cc 100644 --- a/services/libs/data-access-layer/src/repositories/index.ts +++ b/services/libs/data-access-layer/src/repositories/index.ts @@ -1,4 +1,3 @@ -import { groupBy } from '@crowd/common' import { Logger } from '@crowd/logging' import { RedisCache, RedisClient } from '@crowd/redis' @@ -587,15 +586,7 @@ export async function getReposForGithubIntegration( ) } -export async function getReposGroupedByOrg( - qx: QueryExecutor, - integrationId: string, -): Promise> { - const repos = await getReposForGithubIntegration(qx, integrationId) - return Object.fromEntries(groupBy(repos, (repo) => repo.owner)) -} - -export interface IGithubRepoForIntegrationWithSource extends IGithubRepoForIntegration { +interface IGithubRepoForIntegrationWithSource extends IGithubRepoForIntegration { sourceIntegrationId: string } @@ -655,7 +646,8 @@ export async function populateGithubSettingsWithRepos( const s = settings as any if (!s?.orgs || !Array.isArray(s.orgs)) return settings - const reposByOrg = await getReposGroupedByOrg(qx, integrationId) + const allReposByOrg = await getReposGroupedByOrgForIntegrations(qx, [integrationId]) + const reposByOrg = allReposByOrg[integrationId] || {} // Only overwrite orgs[].repos if the repositories table has data. // During the 'mapping' phase (legacy github connect), repos live in settings