From 66a66d5656c290a792bd983f0728b6125bb7df27 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 27 Apr 2026 10:18:56 -0700 Subject: [PATCH 1/2] fix(retention-job): add chunking strategy for cleanup --- apps/sim/background/cleanup-logs.ts | 186 +++++-------------- apps/sim/background/cleanup-soft-deletes.ts | 77 ++++---- apps/sim/background/cleanup-tasks.ts | 37 ++-- apps/sim/lib/cleanup/batch-delete.ts | 188 +++++++++++++++----- 4 files changed, 248 insertions(+), 240 deletions(-) diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index af272e35c8..5f81a56311 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { and, inArray, lt } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' +import { + batchDeleteByWorkspaceAndTimestamp, + chunkedBatchDelete, + type TableCleanupResult, +} from '@/lib/cleanup/batch-delete' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { isUsingCloudStorage, StorageService } from '@/lib/uploads' import { deleteFileMetadata } from '@/lib/uploads/server/metadata' const logger = createLogger('CleanupLogs') -const BATCH_SIZE = 2000 -const MAX_BATCHES_PER_TIER = 10 - -interface TierResults { - total: number - deleted: number - deleteFailed: number +interface FileDeleteStats { filesTotal: number filesDeleted: number filesDeleteFailed: number } -function emptyTierResults(): TierResults { - return { - total: 0, - deleted: 0, - deleteFailed: 0, - filesTotal: 0, - filesDeleted: 0, - filesDeleteFailed: 0, - } -} - -async function deleteExecutionFiles(files: unknown, results: TierResults): Promise { +async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string) - results.filesTotal += keys.length + stats.filesTotal += keys.length await Promise.all( keys.map(async (key) => { try { await StorageService.deleteFile({ key, context: 'execution' }) await deleteFileMetadata(key) - results.filesDeleted++ + stats.filesDeleted++ } catch (fileError) { - results.filesDeleteFailed++ + stats.filesDeleteFailed++ logger.error(`Failed to delete file ${key}:`, { fileError }) } }) ) } -async function cleanupTier( +async function cleanupWorkflowExecutionLogs( workspaceIds: string[], retentionDate: Date, label: string -): Promise { - const results = emptyTierResults() - if (workspaceIds.length === 0) return results - - let batchesProcessed = 0 - let hasMore = true - - while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) { - const batch = await db - .select({ - id: workflowExecutionLogs.id, - files: workflowExecutionLogs.files, - }) - .from(workflowExecutionLogs) - .where( - and( - inArray(workflowExecutionLogs.workspaceId, workspaceIds), - lt(workflowExecutionLogs.startedAt, retentionDate) +): Promise { + const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 } + + const dbStats = await chunkedBatchDelete({ + tableDef: workflowExecutionLogs, + workspaceIds, + tableName: `${label}/workflow_execution_logs`, + selectChunk: (chunkIds, limit) => + db + .select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files }) + .from(workflowExecutionLogs) + .where( + and( + inArray(workflowExecutionLogs.workspaceId, chunkIds), + lt(workflowExecutionLogs.startedAt, retentionDate) + ) ) - ) - .limit(BATCH_SIZE) - - results.total += batch.length + .limit(limit), + onBatch: async (rows) => { + for (const row of rows) await deleteExecutionFiles(row.files, fileStats) + }, + }) - if (batch.length === 0) { - hasMore = false - break - } - - for (const log of batch) { - await deleteExecutionFiles(log.files, results) - } - - const logIds = batch.map((log) => log.id) - try { - const deleted = await db - .delete(workflowExecutionLogs) - .where(inArray(workflowExecutionLogs.id, logIds)) - .returning({ id: workflowExecutionLogs.id }) - - results.deleted += deleted.length - } catch (deleteError) { - results.deleteFailed += logIds.length - logger.error(`Batch delete failed for ${label}:`, { deleteError }) - } - - batchesProcessed++ - hasMore = batch.length === BATCH_SIZE - - logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`) - } - - return results -} - -interface JobLogCleanupResults { - deleted: number - deleteFailed: number -} - -async function cleanupJobExecutionLogsTier( - workspaceIds: string[], - retentionDate: Date, - label: string -): Promise { - const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 } - if (workspaceIds.length === 0) return results - - let batchesProcessed = 0 - let hasMore = true - - while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) { - const batch = await db - .select({ id: jobExecutionLogs.id }) - .from(jobExecutionLogs) - .where( - and( - inArray(jobExecutionLogs.workspaceId, workspaceIds), - lt(jobExecutionLogs.startedAt, retentionDate) - ) - ) - .limit(BATCH_SIZE) - - if (batch.length === 0) { - hasMore = false - break - } - - const logIds = batch.map((log) => log.id) - try { - const deleted = await db - .delete(jobExecutionLogs) - .where(inArray(jobExecutionLogs.id, logIds)) - .returning({ id: jobExecutionLogs.id }) - - results.deleted += deleted.length - } catch (deleteError) { - results.deleteFailed += logIds.length - logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError }) - } - - batchesProcessed++ - hasMore = batch.length === BATCH_SIZE - - logger.info( - `[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed` - ) - } - - return results + return { ...dbStats, ...fileStats } } export async function runCleanupLogs(payload: CleanupJobPayload): Promise { @@ -190,15 +92,21 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise `[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - const results = await cleanupTier(workspaceIds, retentionDate, label) - logger.info( - `[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates` - ) + const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label) + if (workflowResults.filesTotal > 0) { + logger.info( + `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` + ) + } - const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label) - logger.info( - `[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed` - ) + await batchDeleteByWorkspaceAndTimestamp({ + tableDef: jobExecutionLogs, + workspaceIdCol: jobExecutionLogs.workspaceId, + timestampCol: jobExecutionLogs.startedAt, + workspaceIds, + retentionDate, + tableName: `${label}/job_execution_logs`, + }) // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces. if (payload.plan === 'free') { diff --git a/apps/sim/background/cleanup-soft-deletes.ts b/apps/sim/background/cleanup-soft-deletes.ts index aa5a1ef51b..a472a57b48 100644 --- a/apps/sim/background/cleanup-soft-deletes.ts +++ b/apps/sim/background/cleanup-soft-deletes.ts @@ -21,6 +21,7 @@ import { DEFAULT_BATCH_SIZE, DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, + selectRowsByWorkspaceChunks, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' import type { StorageContext } from '@/lib/uploads' @@ -44,35 +45,37 @@ async function selectExpiredWorkspaceFiles( workspaceIds: string[], retentionDate: Date ): Promise { - const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE - const [legacyRows, multiContextRows] = await Promise.all([ - db - .select({ id: workspaceFile.id, key: workspaceFile.key }) - .from(workspaceFile) - .where( - and( - inArray(workspaceFile.workspaceId, workspaceIds), - isNotNull(workspaceFile.deletedAt), - lt(workspaceFile.deletedAt, retentionDate) + selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: workspaceFile.id, key: workspaceFile.key }) + .from(workspaceFile) + .where( + and( + inArray(workspaceFile.workspaceId, chunkIds), + isNotNull(workspaceFile.deletedAt), + lt(workspaceFile.deletedAt, retentionDate) + ) ) - ) - .limit(limit), - db - .select({ - id: workspaceFiles.id, - key: workspaceFiles.key, - context: workspaceFiles.context, - }) - .from(workspaceFiles) - .where( - and( - inArray(workspaceFiles.workspaceId, workspaceIds), - isNotNull(workspaceFiles.deletedAt), - lt(workspaceFiles.deletedAt, retentionDate) + .limit(chunkLimit) + ), + selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ + id: workspaceFiles.id, + key: workspaceFiles.key, + context: workspaceFiles.context, + }) + .from(workspaceFiles) + .where( + and( + inArray(workspaceFiles.workspaceId, chunkIds), + isNotNull(workspaceFiles.deletedAt), + lt(workspaceFiles.deletedAt, retentionDate) + ) ) - ) - .limit(limit), + .limit(chunkLimit) + ), ]) return { @@ -182,17 +185,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise // (chats + S3) AND the DB deletes below — selecting twice could return // different subsets above the LIMIT cap and orphan or prematurely purge data. const [doomedWorkflows, fileScope] = await Promise.all([ - db - .select({ id: workflow.id }) - .from(workflow) - .where( - and( - inArray(workflow.workspaceId, workspaceIds), - isNotNull(workflow.archivedAt), - lt(workflow.archivedAt, retentionDate) + selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: workflow.id }) + .from(workflow) + .where( + and( + inArray(workflow.workspaceId, chunkIds), + isNotNull(workflow.archivedAt), + lt(workflow.archivedAt, retentionDate) + ) ) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE), + .limit(chunkLimit) + ), selectExpiredWorkspaceFiles(workspaceIds, retentionDate), ]) diff --git a/apps/sim/background/cleanup-tasks.ts b/apps/sim/background/cleanup-tasks.ts index ed236882a9..c05d5c707d 100644 --- a/apps/sim/background/cleanup-tasks.ts +++ b/apps/sim/background/cleanup-tasks.ts @@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, + selectRowsByWorkspaceChunks, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' @@ -67,13 +66,15 @@ async function cleanupRunChildren( ): Promise { if (workspaceIds.length === 0) return [] - const runIds = await db - .select({ id: copilotRuns.id }) - .from(copilotRuns) - .where( - and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate)) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + const runIds = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotRuns.id }) + .from(copilotRuns) + .where( + and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate)) + ) + .limit(chunkLimit) + ) if (runIds.length === 0) { return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 })) @@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise `[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - // Collect chat IDs before deleting so we can clean up the copilot backend after - const doomedChats = await db - .select({ id: copilotChats.id }) - .from(copilotChats) - .where( - and( - inArray(copilotChats.workspaceId, workspaceIds), - lt(copilotChats.updatedAt, retentionDate) + const doomedChats = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotChats.id }) + .from(copilotChats) + .where( + and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate)) ) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + .limit(chunkLimit) + ) const doomedChatIds = doomedChats.map((c) => c.id) diff --git a/apps/sim/lib/cleanup/batch-delete.ts b/apps/sim/lib/cleanup/batch-delete.ts index 78d0f8867a..753784d68a 100644 --- a/apps/sim/lib/cleanup/batch-delete.ts +++ b/apps/sim/lib/cleanup/batch-delete.ts @@ -7,6 +7,51 @@ const logger = createLogger('BatchDelete') export const DEFAULT_BATCH_SIZE = 2000 export const DEFAULT_MAX_BATCHES_PER_TABLE = 10 +/** + * Split workspaceIds into this-sized groups before running SELECT/DELETE. Large + * IN lists combined with `started_at < X` force Postgres to probe every + * workspace range in the composite index, which blows the 90s statement timeout + * at the scale of the full free tier. + */ +export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50 + +export function chunkArray(arr: T[], size: number): T[][] { + const out: T[][] = [] + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)) + return out +} + +export interface SelectByWorkspaceChunksOptions { + /** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */ + overallLimit?: number + workspaceChunkSize?: number +} + +/** + * Run a SELECT query once per workspace chunk and concatenate results up to + * `overallLimit`. Each chunk's query is passed the remaining row budget so the + * total never exceeds the cap. Use this when you need the selected row set + * (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete). + */ +export async function selectRowsByWorkspaceChunks( + workspaceIds: string[], + query: (chunkIds: string[], chunkLimit: number) => Promise, + { + overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, + workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, + }: SelectByWorkspaceChunksOptions = {} +): Promise { + if (workspaceIds.length === 0) return [] + + const rows: T[] = [] + for (const chunkIds of chunkArray(workspaceIds, workspaceChunkSize)) { + if (rows.length >= overallLimit) break + const remaining = overallLimit - rows.length + const chunkRows = await query(chunkIds, remaining) + rows.push(...chunkRows) + } + return rows +} export interface TableCleanupResult { table: string @@ -14,36 +59,40 @@ export interface TableCleanupResult { failed: number } -export interface BatchDeleteOptions { +export interface ChunkedBatchDeleteOptions { tableDef: PgTable - workspaceIdCol: PgColumn - timestampCol: PgColumn workspaceIds: string[] - retentionDate: Date tableName: string - /** When true, also requires `timestampCol IS NOT NULL` (soft-delete semantics). */ - requireTimestampNotNull?: boolean + /** SELECT eligible rows for one workspace chunk. The result must include `id`. */ + selectChunk: (chunkIds: string[], limit: number) => Promise + /** Runs between SELECT and DELETE; receives the just-selected rows. */ + onBatch?: (rows: TRow[]) => Promise batchSize?: number + /** Max batches per workspace chunk. Total per-run cap = chunks * maxBatches * batchSize. */ maxBatches?: number + workspaceChunkSize?: number } /** - * Iteratively delete rows in a table matching a workspace + time-based predicate. + * Inner loop primitive for cleanup jobs. + * + * For each workspace chunk: SELECT a batch of eligible rows → run optional + * `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats + * until exhausted or `maxBatches` is hit, then moves to the next chunk. * - * Uses a SELECT-with-LIMIT → DELETE-by-ID pattern to keep each round bounded in - * memory and I/O (PostgreSQL DELETE does not support LIMIT directly). + * Workspace IDs are chunked before the SELECT — see + * `DEFAULT_WORKSPACE_CHUNK_SIZE` for why. */ -export async function batchDeleteByWorkspaceAndTimestamp({ +export async function chunkedBatchDelete({ tableDef, - workspaceIdCol, - timestampCol, workspaceIds, - retentionDate, tableName, - requireTimestampNotNull = false, + selectChunk, + onBatch, batchSize = DEFAULT_BATCH_SIZE, maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE, -}: BatchDeleteOptions): Promise { + workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, +}: ChunkedBatchDeleteOptions): Promise { const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 } if (workspaceIds.length === 0) { @@ -51,48 +100,95 @@ export async function batchDeleteByWorkspaceAndTimestamp({ return result } - const predicates = [inArray(workspaceIdCol, workspaceIds), lt(timestampCol, retentionDate)] - if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol)) - const whereClause = and(...predicates) + const chunks = chunkArray(workspaceIds, workspaceChunkSize) - let batchesProcessed = 0 - let hasMore = true + for (const [chunkIdx, chunkIds] of chunks.entries()) { + let batchesProcessed = 0 + let hasMore = true - while (hasMore && batchesProcessed < maxBatches) { - try { - const batch = await db - .select({ id: sql`id` }) - .from(tableDef) - .where(whereClause) - .limit(batchSize) + while (hasMore && batchesProcessed < maxBatches) { + try { + const rows = await selectChunk(chunkIds, batchSize) + + if (rows.length === 0) { + hasMore = false + break + } + + if (onBatch) await onBatch(rows) + + const ids = rows.map((r) => r.id) + const deleted = await db + .delete(tableDef) + .where(inArray(sql`id`, ids)) + .returning({ id: sql`id` }) - if (batch.length === 0) { - logger.info(`[${tableName}] No expired rows found`) + result.deleted += deleted.length + hasMore = rows.length === batchSize + batchesProcessed++ + } catch (error) { + result.failed++ + logger.error(`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}):`, { + error, + }) hasMore = false - break } - - const ids = batch.map((r) => r.id) - const deleted = await db - .delete(tableDef) - .where(inArray(sql`id`, ids)) - .returning({ id: sql`id` }) - - result.deleted += deleted.length - hasMore = batch.length === batchSize - batchesProcessed++ - - logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`) - } catch (error) { - result.failed++ - logger.error(`[${tableName}] Batch delete failed:`, { error }) - hasMore = false } } + logger.info( + `[${tableName}] Complete: ${result.deleted} rows deleted across ${chunks.length} chunks (${result.failed} chunk failures)` + ) + return result } +export interface BatchDeleteOptions { + tableDef: PgTable + workspaceIdCol: PgColumn + timestampCol: PgColumn + workspaceIds: string[] + retentionDate: Date + tableName: string + /** When true, also requires `timestampCol IS NOT NULL` (soft-delete semantics). */ + requireTimestampNotNull?: boolean + batchSize?: number + maxBatches?: number + workspaceChunkSize?: number +} + +/** + * Convenience wrapper around `chunkedBatchDelete` for the common case: delete + * rows where `workspaceId IN (...) AND timestamp < retentionDate`. Use this + * when there's no per-row side effect (e.g. no S3 files to clean up alongside). + */ +export async function batchDeleteByWorkspaceAndTimestamp({ + tableDef, + workspaceIdCol, + timestampCol, + workspaceIds, + retentionDate, + tableName, + requireTimestampNotNull = false, + ...rest +}: BatchDeleteOptions): Promise { + return chunkedBatchDelete({ + tableDef, + workspaceIds, + tableName, + selectChunk: (chunkIds, limit) => { + const predicates = [inArray(workspaceIdCol, chunkIds), lt(timestampCol, retentionDate)] + if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol)) + return db + .select({ id: sql`id` }) + .from(tableDef) + .where(and(...predicates)) + .limit(limit) + }, + ...rest, + }) +} + /** * Delete rows by an explicit list of IDs. Use this when the IDs were selected * upstream (e.g., to drive external cleanup like S3 deletes or a backend API From 3c48b24af9378ce14fffceaf52dad643c8362e27 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 27 Apr 2026 12:02:19 -0700 Subject: [PATCH 2/2] change stats to be perjob not per chunk --- apps/sim/background/cleanup-logs.ts | 8 +-- apps/sim/background/cleanup-soft-deletes.ts | 22 ++++---- apps/sim/background/cleanup-tasks.ts | 6 +- apps/sim/lib/cleanup/batch-delete.ts | 61 +++++++++++++++------ 4 files changed, 60 insertions(+), 37 deletions(-) diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index 5f81a56311..41fb518935 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -93,11 +93,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise ) const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label) - if (workflowResults.filesTotal > 0) { - logger.info( - `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` - ) - } + logger.info( + `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` + ) await batchDeleteByWorkspaceAndTimestamp({ tableDef: jobExecutionLogs, diff --git a/apps/sim/background/cleanup-soft-deletes.ts b/apps/sim/background/cleanup-soft-deletes.ts index a472a57b48..5173307407 100644 --- a/apps/sim/background/cleanup-soft-deletes.ts +++ b/apps/sim/background/cleanup-soft-deletes.ts @@ -18,10 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, - selectRowsByWorkspaceChunks, + selectRowsByIdChunks, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' import type { StorageContext } from '@/lib/uploads' @@ -46,7 +44,7 @@ async function selectExpiredWorkspaceFiles( retentionDate: Date ): Promise { const [legacyRows, multiContextRows] = await Promise.all([ - selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => db .select({ id: workspaceFile.id, key: workspaceFile.key }) .from(workspaceFile) @@ -59,7 +57,7 @@ async function selectExpiredWorkspaceFiles( ) .limit(chunkLimit) ), - selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => db .select({ id: workspaceFiles.id, @@ -185,7 +183,7 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise // (chats + S3) AND the DB deletes below — selecting twice could return // different subsets above the LIMIT cap and orphan or prematurely purge data. const [doomedWorkflows, fileScope] = await Promise.all([ - selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => db .select({ id: workflow.id }) .from(workflow) @@ -205,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise let chatCleanup: { execute: () => Promise } | null = null if (doomedWorkflowIds.length > 0) { - const doomedChats = await db - .select({ id: copilotChats.id }) - .from(copilotChats) - .where(inArray(copilotChats.workflowId, doomedWorkflowIds)) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotChats.id }) + .from(copilotChats) + .where(inArray(copilotChats.workflowId, chunkIds)) + .limit(chunkLimit) + ) const doomedChatIds = doomedChats.map((c) => c.id) if (doomedChatIds.length > 0) { diff --git a/apps/sim/background/cleanup-tasks.ts b/apps/sim/background/cleanup-tasks.ts index c05d5c707d..25ef67461e 100644 --- a/apps/sim/background/cleanup-tasks.ts +++ b/apps/sim/background/cleanup-tasks.ts @@ -14,7 +14,7 @@ import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/clean import { batchDeleteByWorkspaceAndTimestamp, deleteRowsById, - selectRowsByWorkspaceChunks, + selectRowsByIdChunks, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' @@ -66,7 +66,7 @@ async function cleanupRunChildren( ): Promise { if (workspaceIds.length === 0) return [] - const runIds = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => db .select({ id: copilotRuns.id }) .from(copilotRuns) @@ -108,7 +108,7 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise `[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - const doomedChats = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) => + const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => db .select({ id: copilotChats.id }) .from(copilotChats) diff --git a/apps/sim/lib/cleanup/batch-delete.ts b/apps/sim/lib/cleanup/batch-delete.ts index 753784d68a..dbf6cf21c5 100644 --- a/apps/sim/lib/cleanup/batch-delete.ts +++ b/apps/sim/lib/cleanup/batch-delete.ts @@ -21,30 +21,34 @@ export function chunkArray(arr: T[], size: number): T[][] { return out } -export interface SelectByWorkspaceChunksOptions { +export interface SelectByIdChunksOptions { /** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */ overallLimit?: number - workspaceChunkSize?: number + chunkSize?: number } /** - * Run a SELECT query once per workspace chunk and concatenate results up to + * Run a SELECT query once per ID chunk and concatenate results up to * `overallLimit`. Each chunk's query is passed the remaining row budget so the * total never exceeds the cap. Use this when you need the selected row set * (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete). + * + * Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids + * sending one massive `IN (...)` list that would blow Postgres's statement + * timeout. */ -export async function selectRowsByWorkspaceChunks( - workspaceIds: string[], +export async function selectRowsByIdChunks( + ids: string[], query: (chunkIds: string[], chunkLimit: number) => Promise, { overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, - workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, - }: SelectByWorkspaceChunksOptions = {} + chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, + }: SelectByIdChunksOptions = {} ): Promise { - if (workspaceIds.length === 0) return [] + if (ids.length === 0) return [] const rows: T[] = [] - for (const chunkIds of chunkArray(workspaceIds, workspaceChunkSize)) { + for (const chunkIds of chunkArray(ids, chunkSize)) { if (rows.length >= overallLimit) break const remaining = overallLimit - rows.length const chunkRows = await query(chunkIds, remaining) @@ -68,8 +72,14 @@ export interface ChunkedBatchDeleteOptions { /** Runs between SELECT and DELETE; receives the just-selected rows. */ onBatch?: (rows: TRow[]) => Promise batchSize?: number - /** Max batches per workspace chunk. Total per-run cap = chunks * maxBatches * batchSize. */ + /** Max batches per workspace chunk. */ maxBatches?: number + /** + * Hard cap on rows processed (deleted + failed) across all chunks per call. + * Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron + * runs frequently enough to catch up the backlog over multiple invocations. + */ + totalRowLimit?: number workspaceChunkSize?: number } @@ -78,7 +88,8 @@ export interface ChunkedBatchDeleteOptions { * * For each workspace chunk: SELECT a batch of eligible rows → run optional * `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats - * until exhausted or `maxBatches` is hit, then moves to the next chunk. + * until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops + * the whole call once `totalRowLimit` rows have been processed. * * Workspace IDs are chunked before the SELECT — see * `DEFAULT_WORKSPACE_CHUNK_SIZE` for why. @@ -91,6 +102,7 @@ export async function chunkedBatchDelete({ onBatch, batchSize = DEFAULT_BATCH_SIZE, maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE, + totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, }: ChunkedBatchDeleteOptions): Promise { const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 } @@ -101,14 +113,25 @@ export async function chunkedBatchDelete({ } const chunks = chunkArray(workspaceIds, workspaceChunkSize) + let stoppedEarly = false for (const [chunkIdx, chunkIds] of chunks.entries()) { + if (result.deleted + result.failed >= totalRowLimit) { + stoppedEarly = true + break + } + let batchesProcessed = 0 let hasMore = true - while (hasMore && batchesProcessed < maxBatches) { + while ( + hasMore && + batchesProcessed < maxBatches && + result.deleted + result.failed < totalRowLimit + ) { + let rows: TRow[] = [] try { - const rows = await selectChunk(chunkIds, batchSize) + rows = await selectChunk(chunkIds, batchSize) if (rows.length === 0) { hasMore = false @@ -127,17 +150,19 @@ export async function chunkedBatchDelete({ hasMore = rows.length === batchSize batchesProcessed++ } catch (error) { - result.failed++ - logger.error(`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}):`, { - error, - }) + // Count rows we tried to delete; SELECT-stage errors leave rows=[]. + result.failed += rows.length + logger.error( + `[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`, + { error } + ) hasMore = false } } } logger.info( - `[${tableName}] Complete: ${result.deleted} rows deleted across ${chunks.length} chunks (${result.failed} chunk failures)` + `[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}` ) return result