From d11662bec4e33d477c054e52a0e2f20594becf36 Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 00:05:21 -0800 Subject: [PATCH 1/8] wip fix --- packages/backend/src/connectionManager.ts | 13 ++- packages/backend/src/constants.ts | 22 ++++- packages/backend/src/index.ts | 98 ++++++++++++++--------- packages/backend/src/repoIndexManager.ts | 9 ++- 4 files changed, 98 insertions(+), 44 deletions(-) diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index ee17543a..2d7316cc 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -11,6 +11,7 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js"; import { syncSearchContexts } from "./ee/syncSearchContexts.js"; import { captureEvent } from "./posthog.js"; import { PromClient } from "./promClient.js"; +import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS, SHUTDOWN_SIGNALS } from "./constants.js"; const LOG_TAG = 'connection-manager'; const logger = createLogger(LOG_TAG); @@ -60,6 +61,7 @@ export class ConnectionManager { this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); + // this.worker.on('graceful-timeout', this.onJobFailed.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); } @@ -156,6 +158,9 @@ export class ConnectionManager { // @note: We aren't actually doing anything with this atm. const abortController = new AbortController(); + logger.info('Waiting for 60 seconds...'); + await new Promise(resolve => setTimeout(resolve, 60 * 1000)); + const { connection: { config: rawConnectionConfig, orgId } } = await this.db.connectionSyncJob.update({ where: { id: jobId, @@ -392,8 +397,12 @@ export class ConnectionManager { if (this.interval) { clearInterval(this.interval); } - await this.worker.close(); - await this.queue.close(); + + await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS); + // @note: As of groupmq v1.0.0, queue.close() will just close the underlying + // redis connection. Since we share the same redis client between + // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 + // await this.queue.close(); } } diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index a52d822e..b11f5102 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [ ]; export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos'); -export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index'); \ No newline at end of file +export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index'); + +// Maximum time to wait for current job to finish +export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds + +// List of shutdown signals +export const SHUTDOWN_SIGNALS: string[] = [ + 'SIGHUP', + 'SIGINT', + 'SIGQUIT', + 'SIGILL', + 'SIGTRAP', + 'SIGABRT', + 'SIGBUS', + 'SIGFPE', + 'SIGSEGV', + 'SIGUSR2', + 'SIGTERM', + // @note: SIGKILL and SIGSTOP cannot have listeners installed. + // @see: https://nodejs.org/api/process.html#signal-events +]; diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 5e6d6ba0..75344352 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -1,5 +1,6 @@ import "./instrument.js"; +import * as Sentry from "@sentry/node"; import { PrismaClient } from "@sourcebot/db"; import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared"; import 'express-async-errors'; @@ -9,7 +10,7 @@ import { Redis } from 'ioredis'; import { Api } from "./api.js"; import { ConfigManager } from "./configManager.js"; import { ConnectionManager } from './connectionManager.js'; -import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js'; +import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js'; import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js"; import { GithubAppManager } from "./ee/githubAppManager.js"; import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js'; @@ -17,6 +18,7 @@ import { shutdownPosthog } from "./posthog.js"; import { PromClient } from './promClient.js'; import { RepoIndexManager } from "./repoIndexManager.js"; + const logger = createLogger('backend-entrypoint'); const reposPath = REPOS_CACHE_DIR; @@ -83,45 +85,65 @@ const api = new Api( logger.info('Worker started.'); -const cleanup = async (signal: string) => { - logger.info(`Received ${signal}, cleaning up...`); - - const shutdownTimeout = 30000; // 30 seconds - - try { - await Promise.race([ - Promise.all([ - repoIndexManager.dispose(), - connectionManager.dispose(), - repoPermissionSyncer.dispose(), - accountPermissionSyncer.dispose(), - configManager.dispose(), - ]), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout) - ) - ]); - logger.info('All workers shut down gracefully'); - } catch (error) { - logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error)); + + +const listenToShutdownSignals = () => { + const signals = SHUTDOWN_SIGNALS; + + let receivedSignal = false; + + const cleanup = async (signal: string) => { + try { + if (receivedSignal) { + logger.debug(`Recieved repeat signal ${signal}, ignoring.`); + return; + } + receivedSignal = true; + + logger.info(`Received ${signal}, cleaning up...`); + + await repoIndexManager.dispose() + await connectionManager.dispose() + await repoPermissionSyncer.dispose() + await accountPermissionSyncer.dispose() + await promClient.dispose() + await configManager.dispose() + + logger.info('Disposed all workers'); + + await prisma.$disconnect(); + await redis.quit(); + await api.dispose(); + await shutdownPosthog(); + + + logger.info('All workers shut down gracefully'); + + signals.forEach(sig => process.removeListener(sig, cleanup)); + process.kill(process.pid, signal); + } catch (error) { + Sentry.captureException(error); + logger.error('Error shutting down worker:', error); + process.exit(1); + } } - await prisma.$disconnect(); - await redis.quit(); - await api.dispose(); - await shutdownPosthog(); -} + signals.forEach(signal => { + process.on(signal, cleanup); + }); -process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0))); -process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0))); + // Register handlers for uncaught exceptions and unhandled rejections + process.on('uncaughtException', (err) => { + logger.error(`Uncaught exception: ${err.message}`); + cleanup('uncaughtException').finally(() => process.exit(1)); + }); -// Register handlers for uncaught exceptions and unhandled rejections -process.on('uncaughtException', (err) => { - logger.error(`Uncaught exception: ${err.message}`); - cleanup('uncaughtException').finally(() => process.exit(1)); -}); + process.on('unhandledRejection', (reason, promise) => { + logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); + cleanup('unhandledRejection').finally(() => process.exit(1)); + }); -process.on('unhandledRejection', (reason, promise) => { - logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); - cleanup('unhandledRejection').finally(() => process.exit(1)); -}); + +} + +listenToShutdownSignals(); diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts index 17ed2d8a..9370c85b 100644 --- a/packages/backend/src/repoIndexManager.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -7,7 +7,7 @@ import { readdir, rm } from 'fs/promises'; import { Job, Queue, ReservedJob, Worker } from "groupmq"; import { Redis } from 'ioredis'; import micromatch from 'micromatch'; -import { INDEX_CACHE_DIR } from './constants.js'; +import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS, INDEX_CACHE_DIR } from './constants.js'; import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName, getTags, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js'; import { captureEvent } from './posthog.js'; import { PromClient } from './promClient.js'; @@ -549,8 +549,11 @@ export class RepoIndexManager { if (this.interval) { clearInterval(this.interval); } - await this.worker.close(); - await this.queue.close(); + await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS); + // @note: As of groupmq v1.0.0, queue.close() will just close the underlying + // redis connection. Since we share the same redis client between + // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 + // await this.queue.close(); } } From 85024a32acfaa69773b545e72ec181a7dfd394fe Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 14:48:09 -0800 Subject: [PATCH 2/8] workaround for groupmq deadlock issue --- packages/backend/src/connectionManager.ts | 75 ++++++++++++++++++++--- packages/backend/src/repoCompileUtils.ts | 4 +- 2 files changed, 67 insertions(+), 12 deletions(-) diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index 2d7316cc..f9ee3010 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -11,11 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js"; import { syncSearchContexts } from "./ee/syncSearchContexts.js"; import { captureEvent } from "./posthog.js"; import { PromClient } from "./promClient.js"; -import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS, SHUTDOWN_SIGNALS } from "./constants.js"; +import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js"; const LOG_TAG = 'connection-manager'; const logger = createLogger(LOG_TAG); const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`); +const QUEUE_NAME = 'connection-sync-queue'; type JobPayload = { jobId: string, @@ -31,19 +32,19 @@ type JobResult = { const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout export class ConnectionManager { - private worker: Worker; + private worker: Worker; private queue: Queue; private interval?: NodeJS.Timeout; constructor( private db: PrismaClient, private settings: Settings, - redis: Redis, + private redis: Redis, private promClient: PromClient, ) { this.queue = new Queue({ redis, - namespace: 'connection-sync-queue', + namespace: QUEUE_NAME, jobTimeoutMs: JOB_TIMEOUT_MS, maxAttempts: 3, logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true', @@ -61,7 +62,7 @@ export class ConnectionManager { this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); - // this.worker.on('graceful-timeout', this.onJobFailed.bind(this)); + this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); } @@ -130,6 +131,7 @@ export class ConnectionManager { }); for (const job of jobs) { + logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`); await this.queue.add({ groupId: `connection:${job.connectionId}`, data: { @@ -152,15 +154,26 @@ export class ConnectionManager { const logger = createJobLogger(jobId); logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); + const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({ + where: { + id: jobId, + }, + select: { + status: true, + } + }); + + if (currentStatus.status !== ConnectionSyncJobStatus.PENDING) { + throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING}. Actual: ${currentStatus.status}. Skipping.`); + } + + this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName }); this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName }); // @note: We aren't actually doing anything with this atm. const abortController = new AbortController(); - logger.info('Waiting for 60 seconds...'); - await new Promise(resolve => setTimeout(resolve, 60 * 1000)); - const { connection: { config: rawConnectionConfig, orgId } } = await this.db.connectionSyncJob.update({ where: { id: jobId, @@ -183,7 +196,7 @@ export class ConnectionManager { const result = await (async () => { switch (config.type) { case 'github': { - return await compileGithubConfig(config, job.data.connectionId, abortController); + return await compileGithubConfig(config, job.data.connectionId, abortController.signal); } case 'gitlab': { return await compileGitlabConfig(config, job.data.connectionId); @@ -205,7 +218,7 @@ export class ConnectionManager { } } })(); - + let { repoData, warnings } = result; await this.db.connectionSyncJob.update({ @@ -388,6 +401,33 @@ export class ConnectionManager { }); }); + private onJobGracefulTimeout = async (job: Job) => + groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => { + const logger = createJobLogger(job.id); + + const { connection } = await this.db.connectionSyncJob.update({ + where: { id: job.id }, + data: { + status: ConnectionSyncJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job timed out', + }, + select: { + connection: true, + } + }); + + this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name }); + this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name }); + + logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`); + + captureEvent('backend_connection_sync_job_failed', { + connectionId: connection.id, + error: 'Job timed out', + }); + }); + private async onWorkerError(error: Error) { Sentry.captureException(error); logger.error(`Connection syncer worker error.`, error); @@ -398,7 +438,22 @@ export class ConnectionManager { clearInterval(this.interval); } + const inProgressJobs = this.worker.getCurrentJobs(); await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS); + + // Manually release group locks for in progress jobs to prevent deadlocks. + // @see: https://github.com/Openpanel-dev/groupmq/issues/8 + for (const { job } of inProgressJobs) { + const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`; + logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`); + try { + await this.redis.del(lockKey); + } catch (error) { + Sentry.captureException(error); + logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error); + } + } + // @note: As of groupmq v1.0.0, queue.close() will just close the underlying // redis connection. Since we share the same redis client between // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 diff --git a/packages/backend/src/repoCompileUtils.ts b/packages/backend/src/repoCompileUtils.ts index 10c748a8..5b2c0349 100644 --- a/packages/backend/src/repoCompileUtils.ts +++ b/packages/backend/src/repoCompileUtils.ts @@ -39,8 +39,8 @@ type CompileResult = { export const compileGithubConfig = async ( config: GithubConnectionConfig, connectionId: number, - abortController: AbortController): Promise => { - const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal); + signal: AbortSignal): Promise => { + const gitHubReposResult = await getGitHubReposFromConfig(config, signal); const gitHubRepos = gitHubReposResult.repos; const warnings = gitHubReposResult.warnings; From 39f31cf699494759f3920476253eb60acf0ff5d1 Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 15:00:05 -0800 Subject: [PATCH 3/8] Add workaround to repo index manager --- packages/backend/src/repoIndexManager.ts | 34 +++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts index 9370c85b..3aca629f 100644 --- a/packages/backend/src/repoIndexManager.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -45,7 +45,7 @@ export class RepoIndexManager { constructor( private db: PrismaClient, private settings: Settings, - redis: Redis, + private redis: Redis, private promClient: PromClient, ) { this.queue = new Queue({ @@ -68,6 +68,7 @@ export class RepoIndexManager { this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); + this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); } @@ -540,6 +541,28 @@ export class RepoIndexManager { logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`); }); + private onJobGracefulTimeout = async (job: Job) => + groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => { + const logger = createJobLogger(job.data.jobId); + const jobTypeLabel = getJobTypePrometheusLabel(job.data.type); + + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job timed out', + }, + select: { repo: true } + }); + + this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel }); + this.promClient.repoIndexJobFailTotal.inc({ repo: job.data.repoName, type: jobTypeLabel }); + + logger.error(`Job ${job.data.jobId} timed out for repo ${repo.name} (id: ${repo.id}). Failing job.`); + + }); + private async onWorkerError(error: Error) { Sentry.captureException(error); logger.error(`Index syncer worker error.`, error); @@ -549,7 +572,16 @@ export class RepoIndexManager { if (this.interval) { clearInterval(this.interval); } + const inProgressJobs = this.worker.getCurrentJobs(); await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS); + // Manually release group locks for in progress jobs to prevent deadlocks. + // @see: https://github.com/Openpanel-dev/groupmq/issues/8 + for (const { job } of inProgressJobs) { + const lockKey = `groupmq:repo-index-queue:lock:${job.groupId}`; + logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`); + await this.redis.del(lockKey); + } + // @note: As of groupmq v1.0.0, queue.close() will just close the underlying // redis connection. Since we share the same redis client between // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 From dde0bc175f2af3d46258a9af9d60e808cdeb6c5a Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 15:16:05 -0800 Subject: [PATCH 4/8] s --- packages/backend/src/connectionManager.ts | 12 +++++++---- packages/backend/src/repoIndexManager.ts | 26 ++++++++++++++++++++--- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index f9ee3010..8b6fa6c4 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -62,9 +62,12 @@ export class ConnectionManager { this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); - this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); + // graceful-timeout is triggered when a job is still processing after + // worker.close() is called and the timeout period has elapsed. In this case, + // we fail the job with no retry. + this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); } public startScheduler() { @@ -163,8 +166,8 @@ export class ConnectionManager { } }); - if (currentStatus.status !== ConnectionSyncJobStatus.PENDING) { - throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING}. Actual: ${currentStatus.status}. Skipping.`); + if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) { + throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`); } @@ -455,7 +458,8 @@ export class ConnectionManager { } // @note: As of groupmq v1.0.0, queue.close() will just close the underlying - // redis connection. Since we share the same redis client between + // redis connection. Since we share the same redis client between, skip this + // step and close the redis client directly in index.ts. // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 // await this.queue.close(); } diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts index 3aca629f..5a576d05 100644 --- a/packages/backend/src/repoIndexManager.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -68,9 +68,12 @@ export class RepoIndexManager { this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); - this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); + // graceful-timeout is triggered when a job is still processing after + // worker.close() is called and the timeout period has elapsed. In this case, + // we fail the job with no retry. + this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); } public startScheduler() { @@ -231,6 +234,23 @@ export class RepoIndexManager { const logger = createJobLogger(id); logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); + const currentStatus = await this.db.repoIndexingJob.findUniqueOrThrow({ + where: { + id, + }, + select: { + status: true, + } + }); + + // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job + // is in an invalid state and should be skipped. + if ( + currentStatus.status !== RepoIndexingJobStatus.PENDING && + currentStatus.status !== RepoIndexingJobStatus.IN_PROGRESS + ) { + throw new Error(`Job ${id} is not in a valid state. Expected: ${RepoIndexingJobStatus.PENDING} or ${RepoIndexingJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`); + } const { repo, type: jobType } = await this.db.repoIndexingJob.update({ where: { @@ -583,8 +603,8 @@ export class RepoIndexManager { } // @note: As of groupmq v1.0.0, queue.close() will just close the underlying - // redis connection. Since we share the same redis client between - // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 + // redis connection. Since we share the same redis client between, skip this + // step and close the redis client directly in index.ts. // await this.queue.close(); } } From ee498f0a906d5a5c86d7e5304c351355b1df5166 Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 15:27:19 -0800 Subject: [PATCH 5/8] fix merge issue --- packages/backend/src/index.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 75344352..78b0eca7 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -106,11 +106,8 @@ const listenToShutdownSignals = () => { await connectionManager.dispose() await repoPermissionSyncer.dispose() await accountPermissionSyncer.dispose() - await promClient.dispose() await configManager.dispose() - logger.info('Disposed all workers'); - await prisma.$disconnect(); await redis.quit(); await api.dispose(); From d43f35bfc1ee090590ce79c20d8e3351f1eb28b2 Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 15:33:49 -0800 Subject: [PATCH 6/8] add comment --- packages/backend/src/connectionManager.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index 8b6fa6c4..bfb414df 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -166,6 +166,8 @@ export class ConnectionManager { } }); + // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job + // is in an invalid state and should be skipped. if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) { throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`); } From b5d03527d962599bf2b6b7dcc856a642390392eb Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 20:02:51 -0800 Subject: [PATCH 7/8] feedback --- packages/backend/src/configManager.ts | 4 ++-- packages/backend/src/index.ts | 30 +++++++++++++++------------ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/packages/backend/src/configManager.ts b/packages/backend/src/configManager.ts index 55dbd6ed..6049a52f 100644 --- a/packages/backend/src/configManager.ts +++ b/packages/backend/src/configManager.ts @@ -93,8 +93,8 @@ export class ConfigManager { }); if (connectionNeedsSyncing) { - const [jobId] = await this.connectionManager.createJobs([connection]); - logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`); + logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Creating sync job.`); + await this.connectionManager.createJobs([connection]); } } } diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 78b0eca7..c3674834 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -42,13 +42,14 @@ const prisma = new PrismaClient({ const redis = new Redis(env.REDIS_URL, { maxRetriesPerRequest: null }); -redis.ping().then(() => { + +try { + await redis.ping(); logger.info('Connected to redis'); -}).catch((err: unknown) => { - logger.error('Failed to connect to redis'); - logger.error(err); +} catch (err: unknown) { + logger.error('Failed to connect to redis. Error:', err); process.exit(1); -}); +} const promClient = new PromClient(); @@ -85,8 +86,6 @@ const api = new Api( logger.info('Worker started.'); - - const listenToShutdownSignals = () => { const signals = SHUTDOWN_SIGNALS; @@ -115,29 +114,34 @@ const listenToShutdownSignals = () => { logger.info('All workers shut down gracefully'); - signals.forEach(sig => process.removeListener(sig, cleanup)); - process.kill(process.pid, signal); } catch (error) { Sentry.captureException(error); logger.error('Error shutting down worker:', error); - process.exit(1); } } signals.forEach(signal => { - process.on(signal, cleanup); + process.on(signal, (err) => { + cleanup(err).finally(() => { + process.kill(process.pid, signal); + }); + }); }); // Register handlers for uncaught exceptions and unhandled rejections process.on('uncaughtException', (err) => { logger.error(`Uncaught exception: ${err.message}`); - cleanup('uncaughtException').finally(() => process.exit(1)); + cleanup('uncaughtException').finally(() => { + process.exit(1); + }); }); process.on('unhandledRejection', (reason, promise) => { logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); - cleanup('unhandledRejection').finally(() => process.exit(1)); + cleanup('unhandledRejection').finally(() => { + process.exit(1); + }); }); From f0b57ffb1f7bf562f9d4f6071e7cb8102bb4da93 Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 11 Nov 2025 20:04:41 -0800 Subject: [PATCH 8/8] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4c1f9f9..37ce8d24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609) - Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607) +- Fixed connection sync jobs getting stuck in pending or in progress after restarting the worker. [#612](https://github.com/sourcebot-dev/sourcebot/pull/612) ### Added - Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)