diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index af272e35c8..41fb518935 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { and, inArray, lt } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' +import { + batchDeleteByWorkspaceAndTimestamp, + chunkedBatchDelete, + type TableCleanupResult, +} from '@/lib/cleanup/batch-delete' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { isUsingCloudStorage, StorageService } from '@/lib/uploads' import { deleteFileMetadata } from '@/lib/uploads/server/metadata' const logger = createLogger('CleanupLogs') -const BATCH_SIZE = 2000 -const MAX_BATCHES_PER_TIER = 10 - -interface TierResults { - total: number - deleted: number - deleteFailed: number +interface FileDeleteStats { filesTotal: number filesDeleted: number filesDeleteFailed: number } -function emptyTierResults(): TierResults { - return { - total: 0, - deleted: 0, - deleteFailed: 0, - filesTotal: 0, - filesDeleted: 0, - filesDeleteFailed: 0, - } -} - -async function deleteExecutionFiles(files: unknown, results: TierResults): Promise { +async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string) - results.filesTotal += keys.length + stats.filesTotal += keys.length await Promise.all( keys.map(async (key) => { try { await StorageService.deleteFile({ key, context: 'execution' }) await deleteFileMetadata(key) - results.filesDeleted++ + stats.filesDeleted++ } catch (fileError) { - results.filesDeleteFailed++ + stats.filesDeleteFailed++ logger.error(`Failed to delete file ${key}:`, { fileError }) } }) ) } -async function cleanupTier( - workspaceIds: string[], - retentionDate: Date, - label: string -): Promise { - const results = emptyTierResults() - if (workspaceIds.length === 0) return results - - let batchesProcessed = 0 - let hasMore = true - - while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) { - const batch = await db - .select({ - id: workflowExecutionLogs.id, - files: workflowExecutionLogs.files, - }) - .from(workflowExecutionLogs) - .where( - and( - inArray(workflowExecutionLogs.workspaceId, workspaceIds), - lt(workflowExecutionLogs.startedAt, retentionDate) - ) - ) - .limit(BATCH_SIZE) - - results.total += batch.length - - if (batch.length === 0) { - hasMore = false - break - } - - for (const log of batch) { - await deleteExecutionFiles(log.files, results) - } - - const logIds = batch.map((log) => log.id) - try { - const deleted = await db - .delete(workflowExecutionLogs) - .where(inArray(workflowExecutionLogs.id, logIds)) - .returning({ id: workflowExecutionLogs.id }) - - results.deleted += deleted.length - } catch (deleteError) { - results.deleteFailed += logIds.length - logger.error(`Batch delete failed for ${label}:`, { deleteError }) - } - - batchesProcessed++ - hasMore = batch.length === BATCH_SIZE - - logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`) - } - - return results -} - -interface JobLogCleanupResults { - deleted: number - deleteFailed: number -} - -async function cleanupJobExecutionLogsTier( +async function cleanupWorkflowExecutionLogs( workspaceIds: string[], retentionDate: Date, label: string -): Promise { - const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 } - if (workspaceIds.length === 0) return results - - let batchesProcessed = 0 - let hasMore = true - - while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) { - const batch = await db - .select({ id: jobExecutionLogs.id }) - .from(jobExecutionLogs) - .where( - and( - inArray(jobExecutionLogs.workspaceId, workspaceIds), - lt(jobExecutionLogs.startedAt, retentionDate) +): Promise { + const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 } + + const dbStats = await chunkedBatchDelete({ + tableDef: workflowExecutionLogs, + workspaceIds, + tableName: `${label}/workflow_execution_logs`, + selectChunk: (chunkIds, limit) => + db + .select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files }) + .from(workflowExecutionLogs) + .where( + and( + inArray(workflowExecutionLogs.workspaceId, chunkIds), + lt(workflowExecutionLogs.startedAt, retentionDate) + ) ) - ) - .limit(BATCH_SIZE) - - if (batch.length === 0) { - hasMore = false - break - } + .limit(limit), + onBatch: async (rows) => { + for (const row of rows) await deleteExecutionFiles(row.files, fileStats) + }, + }) - const logIds = batch.map((log) => log.id) - try { - const deleted = await db - .delete(jobExecutionLogs) - .where(inArray(jobExecutionLogs.id, logIds)) - .returning({ id: jobExecutionLogs.id }) - - results.deleted += deleted.length - } catch (deleteError) { - results.deleteFailed += logIds.length - logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError }) - } - - batchesProcessed++ - hasMore = batch.length === BATCH_SIZE - - logger.info( - `[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed` - ) - } - - return results + return { ...dbStats, ...fileStats } } export async function runCleanupLogs(payload: CleanupJobPayload): Promise { @@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise `[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - const results = await cleanupTier(workspaceIds, retentionDate, label) + const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label) logger.info( - `[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates` + `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` ) - const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label) - logger.info( - `[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed` - ) + await batchDeleteByWorkspaceAndTimestamp({ + tableDef: jobExecutionLogs, + workspaceIdCol: jobExecutionLogs.workspaceId, + timestampCol: jobExecutionLogs.startedAt, + workspaceIds, + retentionDate, + tableName: `${label}/job_execution_logs`, + }) // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces. if (payload.plan === 'free') { diff --git a/apps/sim/background/cleanup-soft-deletes.ts b/apps/sim/background/cleanup-soft-deletes.ts index aa5a1ef51b..5173307407 100644 --- a/apps/sim/background/cleanup-soft-deletes.ts +++ b/apps/sim/background/cleanup-soft-deletes.ts @@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, + selectRowsByIdChunks, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' import type { StorageContext } from '@/lib/uploads' @@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles( workspaceIds: string[], retentionDate: Date ): Promise { - const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE - const [legacyRows, multiContextRows] = await Promise.all([ - db - .select({ id: workspaceFile.id, key: workspaceFile.key }) - .from(workspaceFile) - .where( - and( - inArray(workspaceFile.workspaceId, workspaceIds), - isNotNull(workspaceFile.deletedAt), - lt(workspaceFile.deletedAt, retentionDate) + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: workspaceFile.id, key: workspaceFile.key }) + .from(workspaceFile) + .where( + and( + inArray(workspaceFile.workspaceId, chunkIds), + isNotNull(workspaceFile.deletedAt), + lt(workspaceFile.deletedAt, retentionDate) + ) ) - ) - .limit(limit), - db - .select({ - id: workspaceFiles.id, - key: workspaceFiles.key, - context: workspaceFiles.context, - }) - .from(workspaceFiles) - .where( - and( - inArray(workspaceFiles.workspaceId, workspaceIds), - isNotNull(workspaceFiles.deletedAt), - lt(workspaceFiles.deletedAt, retentionDate) + .limit(chunkLimit) + ), + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ + id: workspaceFiles.id, + key: workspaceFiles.key, + context: workspaceFiles.context, + }) + .from(workspaceFiles) + .where( + and( + inArray(workspaceFiles.workspaceId, chunkIds), + isNotNull(workspaceFiles.deletedAt), + lt(workspaceFiles.deletedAt, retentionDate) + ) ) - ) - .limit(limit), + .limit(chunkLimit) + ), ]) return { @@ -182,17 +183,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise // (chats + S3) AND the DB deletes below — selecting twice could return // different subsets above the LIMIT cap and orphan or prematurely purge data. const [doomedWorkflows, fileScope] = await Promise.all([ - db - .select({ id: workflow.id }) - .from(workflow) - .where( - and( - inArray(workflow.workspaceId, workspaceIds), - isNotNull(workflow.archivedAt), - lt(workflow.archivedAt, retentionDate) + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: workflow.id }) + .from(workflow) + .where( + and( + inArray(workflow.workspaceId, chunkIds), + isNotNull(workflow.archivedAt), + lt(workflow.archivedAt, retentionDate) + ) ) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE), + .limit(chunkLimit) + ), selectExpiredWorkspaceFiles(workspaceIds, retentionDate), ]) @@ -200,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise let chatCleanup: { execute: () => Promise } | null = null if (doomedWorkflowIds.length > 0) { - const doomedChats = await db - .select({ id: copilotChats.id }) - .from(copilotChats) - .where(inArray(copilotChats.workflowId, doomedWorkflowIds)) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotChats.id }) + .from(copilotChats) + .where(inArray(copilotChats.workflowId, chunkIds)) + .limit(chunkLimit) + ) const doomedChatIds = doomedChats.map((c) => c.id) if (doomedChatIds.length > 0) { diff --git a/apps/sim/background/cleanup-tasks.ts b/apps/sim/background/cleanup-tasks.ts index ed236882a9..25ef67461e 100644 --- a/apps/sim/background/cleanup-tasks.ts +++ b/apps/sim/background/cleanup-tasks.ts @@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, + selectRowsByIdChunks, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' @@ -67,13 +66,15 @@ async function cleanupRunChildren( ): Promise { if (workspaceIds.length === 0) return [] - const runIds = await db - .select({ id: copilotRuns.id }) - .from(copilotRuns) - .where( - and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate)) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotRuns.id }) + .from(copilotRuns) + .where( + and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate)) + ) + .limit(chunkLimit) + ) if (runIds.length === 0) { return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 })) @@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise `[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - // Collect chat IDs before deleting so we can clean up the copilot backend after - const doomedChats = await db - .select({ id: copilotChats.id }) - .from(copilotChats) - .where( - and( - inArray(copilotChats.workspaceId, workspaceIds), - lt(copilotChats.updatedAt, retentionDate) + const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotChats.id }) + .from(copilotChats) + .where( + and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate)) ) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + .limit(chunkLimit) + ) const doomedChatIds = doomedChats.map((c) => c.id) diff --git a/apps/sim/lib/cleanup/batch-delete.ts b/apps/sim/lib/cleanup/batch-delete.ts index 78d0f8867a..dbf6cf21c5 100644 --- a/apps/sim/lib/cleanup/batch-delete.ts +++ b/apps/sim/lib/cleanup/batch-delete.ts @@ -7,6 +7,55 @@ const logger = createLogger('BatchDelete') export const DEFAULT_BATCH_SIZE = 2000 export const DEFAULT_MAX_BATCHES_PER_TABLE = 10 +/** + * Split workspaceIds into this-sized groups before running SELECT/DELETE. Large + * IN lists combined with `started_at < X` force Postgres to probe every + * workspace range in the composite index, which blows the 90s statement timeout + * at the scale of the full free tier. + */ +export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50 + +export function chunkArray(arr: T[], size: number): T[][] { + const out: T[][] = [] + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)) + return out +} + +export interface SelectByIdChunksOptions { + /** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */ + overallLimit?: number + chunkSize?: number +} + +/** + * Run a SELECT query once per ID chunk and concatenate results up to + * `overallLimit`. Each chunk's query is passed the remaining row budget so the + * total never exceeds the cap. Use this when you need the selected row set + * (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete). + * + * Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids + * sending one massive `IN (...)` list that would blow Postgres's statement + * timeout. + */ +export async function selectRowsByIdChunks( + ids: string[], + query: (chunkIds: string[], chunkLimit: number) => Promise, + { + overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, + chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, + }: SelectByIdChunksOptions = {} +): Promise { + if (ids.length === 0) return [] + + const rows: T[] = [] + for (const chunkIds of chunkArray(ids, chunkSize)) { + if (rows.length >= overallLimit) break + const remaining = overallLimit - rows.length + const chunkRows = await query(chunkIds, remaining) + rows.push(...chunkRows) + } + return rows +} export interface TableCleanupResult { table: string @@ -14,36 +63,48 @@ export interface TableCleanupResult { failed: number } -export interface BatchDeleteOptions { +export interface ChunkedBatchDeleteOptions { tableDef: PgTable - workspaceIdCol: PgColumn - timestampCol: PgColumn workspaceIds: string[] - retentionDate: Date tableName: string - /** When true, also requires `timestampCol IS NOT NULL` (soft-delete semantics). */ - requireTimestampNotNull?: boolean + /** SELECT eligible rows for one workspace chunk. The result must include `id`. */ + selectChunk: (chunkIds: string[], limit: number) => Promise + /** Runs between SELECT and DELETE; receives the just-selected rows. */ + onBatch?: (rows: TRow[]) => Promise batchSize?: number + /** Max batches per workspace chunk. */ maxBatches?: number + /** + * Hard cap on rows processed (deleted + failed) across all chunks per call. + * Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron + * runs frequently enough to catch up the backlog over multiple invocations. + */ + totalRowLimit?: number + workspaceChunkSize?: number } /** - * Iteratively delete rows in a table matching a workspace + time-based predicate. + * Inner loop primitive for cleanup jobs. * - * Uses a SELECT-with-LIMIT → DELETE-by-ID pattern to keep each round bounded in - * memory and I/O (PostgreSQL DELETE does not support LIMIT directly). + * For each workspace chunk: SELECT a batch of eligible rows → run optional + * `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats + * until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops + * the whole call once `totalRowLimit` rows have been processed. + * + * Workspace IDs are chunked before the SELECT — see + * `DEFAULT_WORKSPACE_CHUNK_SIZE` for why. */ -export async function batchDeleteByWorkspaceAndTimestamp({ +export async function chunkedBatchDelete({ tableDef, - workspaceIdCol, - timestampCol, workspaceIds, - retentionDate, tableName, - requireTimestampNotNull = false, + selectChunk, + onBatch, batchSize = DEFAULT_BATCH_SIZE, maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE, -}: BatchDeleteOptions): Promise { + totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, + workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, +}: ChunkedBatchDeleteOptions): Promise { const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 } if (workspaceIds.length === 0) { @@ -51,48 +112,108 @@ export async function batchDeleteByWorkspaceAndTimestamp({ return result } - const predicates = [inArray(workspaceIdCol, workspaceIds), lt(timestampCol, retentionDate)] - if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol)) - const whereClause = and(...predicates) + const chunks = chunkArray(workspaceIds, workspaceChunkSize) + let stoppedEarly = false + + for (const [chunkIdx, chunkIds] of chunks.entries()) { + if (result.deleted + result.failed >= totalRowLimit) { + stoppedEarly = true + break + } - let batchesProcessed = 0 - let hasMore = true + let batchesProcessed = 0 + let hasMore = true - while (hasMore && batchesProcessed < maxBatches) { - try { - const batch = await db - .select({ id: sql`id` }) - .from(tableDef) - .where(whereClause) - .limit(batchSize) + while ( + hasMore && + batchesProcessed < maxBatches && + result.deleted + result.failed < totalRowLimit + ) { + let rows: TRow[] = [] + try { + rows = await selectChunk(chunkIds, batchSize) + + if (rows.length === 0) { + hasMore = false + break + } - if (batch.length === 0) { - logger.info(`[${tableName}] No expired rows found`) + if (onBatch) await onBatch(rows) + + const ids = rows.map((r) => r.id) + const deleted = await db + .delete(tableDef) + .where(inArray(sql`id`, ids)) + .returning({ id: sql`id` }) + + result.deleted += deleted.length + hasMore = rows.length === batchSize + batchesProcessed++ + } catch (error) { + // Count rows we tried to delete; SELECT-stage errors leave rows=[]. + result.failed += rows.length + logger.error( + `[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`, + { error } + ) hasMore = false - break } - - const ids = batch.map((r) => r.id) - const deleted = await db - .delete(tableDef) - .where(inArray(sql`id`, ids)) - .returning({ id: sql`id` }) - - result.deleted += deleted.length - hasMore = batch.length === batchSize - batchesProcessed++ - - logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`) - } catch (error) { - result.failed++ - logger.error(`[${tableName}] Batch delete failed:`, { error }) - hasMore = false } } + logger.info( + `[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}` + ) + return result } +export interface BatchDeleteOptions { + tableDef: PgTable + workspaceIdCol: PgColumn + timestampCol: PgColumn + workspaceIds: string[] + retentionDate: Date + tableName: string + /** When true, also requires `timestampCol IS NOT NULL` (soft-delete semantics). */ + requireTimestampNotNull?: boolean + batchSize?: number + maxBatches?: number + workspaceChunkSize?: number +} + +/** + * Convenience wrapper around `chunkedBatchDelete` for the common case: delete + * rows where `workspaceId IN (...) AND timestamp < retentionDate`. Use this + * when there's no per-row side effect (e.g. no S3 files to clean up alongside). + */ +export async function batchDeleteByWorkspaceAndTimestamp({ + tableDef, + workspaceIdCol, + timestampCol, + workspaceIds, + retentionDate, + tableName, + requireTimestampNotNull = false, + ...rest +}: BatchDeleteOptions): Promise { + return chunkedBatchDelete({ + tableDef, + workspaceIds, + tableName, + selectChunk: (chunkIds, limit) => { + const predicates = [inArray(workspaceIdCol, chunkIds), lt(timestampCol, retentionDate)] + if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol)) + return db + .select({ id: sql`id` }) + .from(tableDef) + .where(and(...predicates)) + .limit(limit) + }, + ...rest, + }) +} + /** * Delete rows by an explicit list of IDs. Use this when the IDs were selected * upstream (e.g., to drive external cleanup like S3 deletes or a backend API