diff --git a/apps/sim/app/api/files/multipart/route.ts b/apps/sim/app/api/files/multipart/route.ts index ac087025083..e9573c01ff6 100644 --- a/apps/sim/app/api/files/multipart/route.ts +++ b/apps/sim/app/api/files/multipart/route.ts @@ -8,21 +8,61 @@ import { isUsingCloudStorage, type StorageContext, } from '@/lib/uploads' +import { + signUploadToken, + type UploadTokenPayload, + verifyUploadToken, +} from '@/lib/uploads/core/upload-token' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('MultipartUploadAPI') +const ALLOWED_UPLOAD_CONTEXTS = new Set([ + 'knowledge-base', + 'chat', + 'copilot', + 'mothership', + 'execution', + 'workspace', + 'profile-pictures', + 'og-images', + 'logs', + 'workspace-logos', +]) + interface InitiateMultipartRequest { fileName: string contentType: string fileSize: number + workspaceId: string context?: StorageContext } -interface GetPartUrlsRequest { - uploadId: string - key: string +interface TokenBoundRequest { + uploadToken: string +} + +interface GetPartUrlsRequest extends TokenBoundRequest { partNumbers: number[] - context?: StorageContext +} + +interface CompleteSingleRequest extends TokenBoundRequest { + parts: unknown +} + +interface CompleteBatchRequest { + uploads: Array +} + +const verifyTokenForUser = (token: string | undefined, userId: string) => { + if (!token || typeof token !== 'string') { + return null + } + const result = verifyUploadToken(token) + if (!result.valid || result.payload.userId !== userId) { + return null + } + return result.payload } export const POST = withRouteHandler(async (request: NextRequest) => { @@ -31,6 +71,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => { if (!session?.user?.id) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } + const userId = session.user.id const action = request.nextUrl.searchParams.get('action') @@ -45,32 +86,34 @@ export const POST = withRouteHandler(async (request: NextRequest) => { switch (action) { case 'initiate': { - const data: InitiateMultipartRequest = await request.json() - const { fileName, contentType, fileSize, context = 'knowledge-base' } = data + const data = (await request.json()) as InitiateMultipartRequest + const { fileName, contentType, fileSize, workspaceId, context = 'knowledge-base' } = data - const config = getStorageConfig(context) + if (!workspaceId || typeof workspaceId !== 'string') { + return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 }) + } - if (storageProvider === 's3') { - const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') + if (!ALLOWED_UPLOAD_CONTEXTS.has(context)) { + return NextResponse.json({ error: 'Invalid storage context' }, { status: 400 }) + } - const result = await initiateS3MultipartUpload({ - fileName, - contentType, - fileSize, - }) + const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId) + if (permission !== 'write' && permission !== 'admin') { + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } - logger.info( - `Initiated S3 multipart upload for ${fileName} (context: ${context}): ${result.uploadId}` - ) + const config = getStorageConfig(context) - return NextResponse.json({ - uploadId: result.uploadId, - key: result.key, - }) - } - if (storageProvider === 'blob') { - const { initiateMultipartUpload } = await import('@/lib/uploads/providers/blob/client') + let uploadId: string + let key: string + if (storageProvider === 's3') { + const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') + const result = await initiateS3MultipartUpload({ fileName, contentType, fileSize }) + uploadId = result.uploadId + key = result.key + } else if (storageProvider === 'blob') { + const { initiateMultipartUpload } = await import('@/lib/uploads/providers/blob/client') const result = await initiateMultipartUpload({ fileName, contentType, @@ -82,46 +125,55 @@ export const POST = withRouteHandler(async (request: NextRequest) => { connectionString: config.connectionString, }, }) - - logger.info( - `Initiated Azure multipart upload for ${fileName} (context: ${context}): ${result.uploadId}` + uploadId = result.uploadId + key = result.key + } else { + return NextResponse.json( + { error: `Unsupported storage provider: ${storageProvider}` }, + { status: 400 } ) - - return NextResponse.json({ - uploadId: result.uploadId, - key: result.key, - }) } - return NextResponse.json( - { error: `Unsupported storage provider: ${storageProvider}` }, - { status: 400 } + const uploadToken = signUploadToken({ + uploadId, + key, + userId, + workspaceId, + context, + }) + + logger.info( + `Initiated ${storageProvider} multipart upload for ${fileName} (context: ${context}, workspace: ${workspaceId}): ${uploadId}` ) + + return NextResponse.json({ uploadId, key, uploadToken }) } case 'get-part-urls': { - const data: GetPartUrlsRequest = await request.json() - const { uploadId, key, partNumbers, context = 'knowledge-base' } = data + const data = (await request.json()) as GetPartUrlsRequest + const { partNumbers } = data + + const tokenPayload = verifyTokenForUser(data.uploadToken, userId) + if (!tokenPayload) { + return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 }) + } + const { uploadId, key, context } = tokenPayload const config = getStorageConfig(context) if (storageProvider === 's3') { const { getS3MultipartPartUrls } = await import('@/lib/uploads/providers/s3/client') - const presignedUrls = await getS3MultipartPartUrls(key, uploadId, partNumbers) - return NextResponse.json({ presignedUrls }) } if (storageProvider === 'blob') { const { getMultipartPartUrls } = await import('@/lib/uploads/providers/blob/client') - const presignedUrls = await getMultipartPartUrls(key, partNumbers, { containerName: config.containerName!, accountName: config.accountName!, accountKey: config.accountKey, connectionString: config.connectionString, }) - return NextResponse.json({ presignedUrls }) } @@ -132,24 +184,32 @@ export const POST = withRouteHandler(async (request: NextRequest) => { } case 'complete': { - const data = await request.json() - const context: StorageContext = data.context || 'knowledge-base' + const data = (await request.json()) as CompleteSingleRequest | CompleteBatchRequest - const config = getStorageConfig(context) + if ('uploads' in data && Array.isArray(data.uploads)) { + const verified = data.uploads.map((upload) => { + const payload = verifyTokenForUser(upload.uploadToken, userId) + return payload ? { payload, parts: upload.parts } : null + }) + + if (verified.some((entry) => entry === null)) { + return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 }) + } + + const verifiedEntries = verified.filter( + (entry): entry is { payload: UploadTokenPayload; parts: unknown } => entry !== null + ) - if ('uploads' in data) { const results = await Promise.all( - data.uploads.map(async (upload: any) => { - const { uploadId, key } = upload + verifiedEntries.map(async ({ payload, parts }) => { + const { uploadId, key, context } = payload + const config = getStorageConfig(context) if (storageProvider === 's3') { const { completeS3MultipartUpload } = await import( '@/lib/uploads/providers/s3/client' ) - const parts = upload.parts // S3 format: { ETag, PartNumber } - - const result = await completeS3MultipartUpload(key, uploadId, parts) - + const result = await completeS3MultipartUpload(key, uploadId, parts as any) return { success: true, location: result.location, @@ -161,15 +221,12 @@ export const POST = withRouteHandler(async (request: NextRequest) => { const { completeMultipartUpload } = await import( '@/lib/uploads/providers/blob/client' ) - const parts = upload.parts // Azure format: { blockId, partNumber } - - const result = await completeMultipartUpload(key, parts, { + const result = await completeMultipartUpload(key, parts as any, { containerName: config.containerName!, accountName: config.accountName!, accountKey: config.accountKey, connectionString: config.connectionString, }) - return { success: true, location: result.location, @@ -182,19 +239,23 @@ export const POST = withRouteHandler(async (request: NextRequest) => { }) ) - logger.info(`Completed ${data.uploads.length} multipart uploads (context: ${context})`) + logger.info(`Completed ${verifiedEntries.length} multipart uploads`) return NextResponse.json({ results }) } - const { uploadId, key, parts } = data + const single = data as CompleteSingleRequest + const tokenPayload = verifyTokenForUser(single.uploadToken, userId) + if (!tokenPayload) { + return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 }) + } + + const { uploadId, key, context } = tokenPayload + const config = getStorageConfig(context) if (storageProvider === 's3') { const { completeS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') - - const result = await completeS3MultipartUpload(key, uploadId, parts) - + const result = await completeS3MultipartUpload(key, uploadId, single.parts as any) logger.info(`Completed S3 multipart upload for key ${key} (context: ${context})`) - return NextResponse.json({ success: true, location: result.location, @@ -204,16 +265,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => { } if (storageProvider === 'blob') { const { completeMultipartUpload } = await import('@/lib/uploads/providers/blob/client') - - const result = await completeMultipartUpload(key, parts, { + const result = await completeMultipartUpload(key, single.parts as any, { containerName: config.containerName!, accountName: config.accountName!, accountKey: config.accountKey, connectionString: config.connectionString, }) - logger.info(`Completed Azure multipart upload for key ${key} (context: ${context})`) - return NextResponse.json({ success: true, location: result.location, @@ -229,27 +287,27 @@ export const POST = withRouteHandler(async (request: NextRequest) => { } case 'abort': { - const data = await request.json() - const { uploadId, key, context = 'knowledge-base' } = data + const data = (await request.json()) as TokenBoundRequest + const tokenPayload = verifyTokenForUser(data.uploadToken, userId) + if (!tokenPayload) { + return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 }) + } - const config = getStorageConfig(context as StorageContext) + const { uploadId, key, context } = tokenPayload + const config = getStorageConfig(context) if (storageProvider === 's3') { const { abortS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') - await abortS3MultipartUpload(key, uploadId) - logger.info(`Aborted S3 multipart upload for key ${key} (context: ${context})`) } else if (storageProvider === 'blob') { const { abortMultipartUpload } = await import('@/lib/uploads/providers/blob/client') - await abortMultipartUpload(key, { containerName: config.containerName!, accountName: config.accountName!, accountKey: config.accountKey, connectionString: config.connectionString, }) - logger.info(`Aborted Azure multipart upload for key ${key} (context: ${context})`) } else { return NextResponse.json( diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts b/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts index d224172f24c..e806947a572 100644 --- a/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts +++ b/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts @@ -604,6 +604,10 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { const startTime = getHighResTime() try { + if (!options.workspaceId) { + throw new Error('workspaceId is required for multipart upload') + } + const initiateResponse = await fetch('/api/files/multipart?action=initiate', { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -611,6 +615,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { fileName: file.name, contentType: getFileContentType(file), fileSize: file.size, + workspaceId: options.workspaceId, }), }) @@ -618,7 +623,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { throw new Error(`Failed to initiate multipart upload: ${initiateResponse.statusText}`) } - const { uploadId, key } = await initiateResponse.json() + const { uploadId, key, uploadToken } = await initiateResponse.json() logger.info(`Initiated multipart upload with ID: ${uploadId}`) const chunkSize = UPLOAD_CONFIG.CHUNK_SIZE @@ -629,8 +634,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ - uploadId, - key, + uploadToken, partNumbers, }), }) @@ -639,7 +643,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { await fetch('/api/files/multipart?action=abort', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ uploadId, key }), + body: JSON.stringify({ uploadToken }), }) throw new Error(`Failed to get part URLs: ${partUrlsResponse.statusText}`) } @@ -723,8 +727,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ - uploadId, - key, + uploadToken, parts: uploadedParts, }), }) diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index af272e35c8a..41fb5189357 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { and, inArray, lt } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' +import { + batchDeleteByWorkspaceAndTimestamp, + chunkedBatchDelete, + type TableCleanupResult, +} from '@/lib/cleanup/batch-delete' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { isUsingCloudStorage, StorageService } from '@/lib/uploads' import { deleteFileMetadata } from '@/lib/uploads/server/metadata' const logger = createLogger('CleanupLogs') -const BATCH_SIZE = 2000 -const MAX_BATCHES_PER_TIER = 10 - -interface TierResults { - total: number - deleted: number - deleteFailed: number +interface FileDeleteStats { filesTotal: number filesDeleted: number filesDeleteFailed: number } -function emptyTierResults(): TierResults { - return { - total: 0, - deleted: 0, - deleteFailed: 0, - filesTotal: 0, - filesDeleted: 0, - filesDeleteFailed: 0, - } -} - -async function deleteExecutionFiles(files: unknown, results: TierResults): Promise { +async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string) - results.filesTotal += keys.length + stats.filesTotal += keys.length await Promise.all( keys.map(async (key) => { try { await StorageService.deleteFile({ key, context: 'execution' }) await deleteFileMetadata(key) - results.filesDeleted++ + stats.filesDeleted++ } catch (fileError) { - results.filesDeleteFailed++ + stats.filesDeleteFailed++ logger.error(`Failed to delete file ${key}:`, { fileError }) } }) ) } -async function cleanupTier( - workspaceIds: string[], - retentionDate: Date, - label: string -): Promise { - const results = emptyTierResults() - if (workspaceIds.length === 0) return results - - let batchesProcessed = 0 - let hasMore = true - - while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) { - const batch = await db - .select({ - id: workflowExecutionLogs.id, - files: workflowExecutionLogs.files, - }) - .from(workflowExecutionLogs) - .where( - and( - inArray(workflowExecutionLogs.workspaceId, workspaceIds), - lt(workflowExecutionLogs.startedAt, retentionDate) - ) - ) - .limit(BATCH_SIZE) - - results.total += batch.length - - if (batch.length === 0) { - hasMore = false - break - } - - for (const log of batch) { - await deleteExecutionFiles(log.files, results) - } - - const logIds = batch.map((log) => log.id) - try { - const deleted = await db - .delete(workflowExecutionLogs) - .where(inArray(workflowExecutionLogs.id, logIds)) - .returning({ id: workflowExecutionLogs.id }) - - results.deleted += deleted.length - } catch (deleteError) { - results.deleteFailed += logIds.length - logger.error(`Batch delete failed for ${label}:`, { deleteError }) - } - - batchesProcessed++ - hasMore = batch.length === BATCH_SIZE - - logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`) - } - - return results -} - -interface JobLogCleanupResults { - deleted: number - deleteFailed: number -} - -async function cleanupJobExecutionLogsTier( +async function cleanupWorkflowExecutionLogs( workspaceIds: string[], retentionDate: Date, label: string -): Promise { - const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 } - if (workspaceIds.length === 0) return results - - let batchesProcessed = 0 - let hasMore = true - - while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) { - const batch = await db - .select({ id: jobExecutionLogs.id }) - .from(jobExecutionLogs) - .where( - and( - inArray(jobExecutionLogs.workspaceId, workspaceIds), - lt(jobExecutionLogs.startedAt, retentionDate) +): Promise { + const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 } + + const dbStats = await chunkedBatchDelete({ + tableDef: workflowExecutionLogs, + workspaceIds, + tableName: `${label}/workflow_execution_logs`, + selectChunk: (chunkIds, limit) => + db + .select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files }) + .from(workflowExecutionLogs) + .where( + and( + inArray(workflowExecutionLogs.workspaceId, chunkIds), + lt(workflowExecutionLogs.startedAt, retentionDate) + ) ) - ) - .limit(BATCH_SIZE) - - if (batch.length === 0) { - hasMore = false - break - } + .limit(limit), + onBatch: async (rows) => { + for (const row of rows) await deleteExecutionFiles(row.files, fileStats) + }, + }) - const logIds = batch.map((log) => log.id) - try { - const deleted = await db - .delete(jobExecutionLogs) - .where(inArray(jobExecutionLogs.id, logIds)) - .returning({ id: jobExecutionLogs.id }) - - results.deleted += deleted.length - } catch (deleteError) { - results.deleteFailed += logIds.length - logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError }) - } - - batchesProcessed++ - hasMore = batch.length === BATCH_SIZE - - logger.info( - `[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed` - ) - } - - return results + return { ...dbStats, ...fileStats } } export async function runCleanupLogs(payload: CleanupJobPayload): Promise { @@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise `[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - const results = await cleanupTier(workspaceIds, retentionDate, label) + const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label) logger.info( - `[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates` + `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` ) - const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label) - logger.info( - `[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed` - ) + await batchDeleteByWorkspaceAndTimestamp({ + tableDef: jobExecutionLogs, + workspaceIdCol: jobExecutionLogs.workspaceId, + timestampCol: jobExecutionLogs.startedAt, + workspaceIds, + retentionDate, + tableName: `${label}/job_execution_logs`, + }) // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces. if (payload.plan === 'free') { diff --git a/apps/sim/background/cleanup-soft-deletes.ts b/apps/sim/background/cleanup-soft-deletes.ts index aa5a1ef51b3..51733074075 100644 --- a/apps/sim/background/cleanup-soft-deletes.ts +++ b/apps/sim/background/cleanup-soft-deletes.ts @@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, + selectRowsByIdChunks, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' import type { StorageContext } from '@/lib/uploads' @@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles( workspaceIds: string[], retentionDate: Date ): Promise { - const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE - const [legacyRows, multiContextRows] = await Promise.all([ - db - .select({ id: workspaceFile.id, key: workspaceFile.key }) - .from(workspaceFile) - .where( - and( - inArray(workspaceFile.workspaceId, workspaceIds), - isNotNull(workspaceFile.deletedAt), - lt(workspaceFile.deletedAt, retentionDate) + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: workspaceFile.id, key: workspaceFile.key }) + .from(workspaceFile) + .where( + and( + inArray(workspaceFile.workspaceId, chunkIds), + isNotNull(workspaceFile.deletedAt), + lt(workspaceFile.deletedAt, retentionDate) + ) ) - ) - .limit(limit), - db - .select({ - id: workspaceFiles.id, - key: workspaceFiles.key, - context: workspaceFiles.context, - }) - .from(workspaceFiles) - .where( - and( - inArray(workspaceFiles.workspaceId, workspaceIds), - isNotNull(workspaceFiles.deletedAt), - lt(workspaceFiles.deletedAt, retentionDate) + .limit(chunkLimit) + ), + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ + id: workspaceFiles.id, + key: workspaceFiles.key, + context: workspaceFiles.context, + }) + .from(workspaceFiles) + .where( + and( + inArray(workspaceFiles.workspaceId, chunkIds), + isNotNull(workspaceFiles.deletedAt), + lt(workspaceFiles.deletedAt, retentionDate) + ) ) - ) - .limit(limit), + .limit(chunkLimit) + ), ]) return { @@ -182,17 +183,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise // (chats + S3) AND the DB deletes below — selecting twice could return // different subsets above the LIMIT cap and orphan or prematurely purge data. const [doomedWorkflows, fileScope] = await Promise.all([ - db - .select({ id: workflow.id }) - .from(workflow) - .where( - and( - inArray(workflow.workspaceId, workspaceIds), - isNotNull(workflow.archivedAt), - lt(workflow.archivedAt, retentionDate) + selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: workflow.id }) + .from(workflow) + .where( + and( + inArray(workflow.workspaceId, chunkIds), + isNotNull(workflow.archivedAt), + lt(workflow.archivedAt, retentionDate) + ) ) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE), + .limit(chunkLimit) + ), selectExpiredWorkspaceFiles(workspaceIds, retentionDate), ]) @@ -200,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise let chatCleanup: { execute: () => Promise } | null = null if (doomedWorkflowIds.length > 0) { - const doomedChats = await db - .select({ id: copilotChats.id }) - .from(copilotChats) - .where(inArray(copilotChats.workflowId, doomedWorkflowIds)) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotChats.id }) + .from(copilotChats) + .where(inArray(copilotChats.workflowId, chunkIds)) + .limit(chunkLimit) + ) const doomedChatIds = doomedChats.map((c) => c.id) if (doomedChatIds.length > 0) { diff --git a/apps/sim/background/cleanup-tasks.ts b/apps/sim/background/cleanup-tasks.ts index ed236882a9d..25ef67461ed 100644 --- a/apps/sim/background/cleanup-tasks.ts +++ b/apps/sim/background/cleanup-tasks.ts @@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_BATCHES_PER_TABLE, deleteRowsById, + selectRowsByIdChunks, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup' @@ -67,13 +66,15 @@ async function cleanupRunChildren( ): Promise { if (workspaceIds.length === 0) return [] - const runIds = await db - .select({ id: copilotRuns.id }) - .from(copilotRuns) - .where( - and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate)) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotRuns.id }) + .from(copilotRuns) + .where( + and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate)) + ) + .limit(chunkLimit) + ) if (runIds.length === 0) { return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 })) @@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise `[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) - // Collect chat IDs before deleting so we can clean up the copilot backend after - const doomedChats = await db - .select({ id: copilotChats.id }) - .from(copilotChats) - .where( - and( - inArray(copilotChats.workspaceId, workspaceIds), - lt(copilotChats.updatedAt, retentionDate) + const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) => + db + .select({ id: copilotChats.id }) + .from(copilotChats) + .where( + and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate)) ) - ) - .limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE) + .limit(chunkLimit) + ) const doomedChatIds = doomedChats.map((c) => c.id) diff --git a/apps/sim/blocks/blocks/slack.ts b/apps/sim/blocks/blocks/slack.ts index 84d903c1754..508c54c7a0e 100644 --- a/apps/sim/blocks/blocks/slack.ts +++ b/apps/sim/blocks/blocks/slack.ts @@ -46,6 +46,10 @@ export const SlackBlock: BlockConfig = { { label: 'Get User Presence', id: 'get_user_presence' }, { label: 'Edit Canvas', id: 'edit_canvas' }, { label: 'Create Channel Canvas', id: 'create_channel_canvas' }, + { label: 'Get Canvas Info', id: 'get_canvas' }, + { label: 'List Canvases', id: 'list_canvases' }, + { label: 'Lookup Canvas Sections', id: 'lookup_canvas_sections' }, + { label: 'Delete Canvas', id: 'delete_canvas' }, { label: 'Create Conversation', id: 'create_conversation' }, { label: 'Invite to Conversation', id: 'invite_to_conversation' }, { label: 'Open View', id: 'open_view' }, @@ -146,6 +150,9 @@ export const SlackBlock: BlockConfig = { 'get_user', 'get_user_presence', 'edit_canvas', + 'get_canvas', + 'lookup_canvas_sections', + 'delete_canvas', 'create_conversation', 'open_view', 'update_view', @@ -160,7 +167,11 @@ export const SlackBlock: BlockConfig = { }, } }, - required: true, + required: { + field: 'operation', + value: 'list_canvases', + not: true, + }, }, { id: 'manualChannel', @@ -182,6 +193,9 @@ export const SlackBlock: BlockConfig = { 'get_user', 'get_user_presence', 'edit_canvas', + 'get_canvas', + 'lookup_canvas_sections', + 'delete_canvas', 'create_conversation', 'open_view', 'update_view', @@ -196,7 +210,11 @@ export const SlackBlock: BlockConfig = { }, } }, - required: true, + required: { + field: 'operation', + value: 'list_canvases', + not: true, + }, }, { id: 'dmUserId', @@ -820,6 +838,121 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`, value: 'create_channel_canvas', }, }, + // Get Canvas specific fields + { + id: 'getCanvasId', + title: 'Canvas ID', + type: 'short-input', + placeholder: 'Enter canvas ID (e.g., F1234ABCD)', + condition: { + field: 'operation', + value: 'get_canvas', + }, + required: true, + }, + // List Canvases specific fields + { + id: 'canvasListCount', + title: 'Canvas Limit', + type: 'short-input', + placeholder: '100', + condition: { + field: 'operation', + value: 'list_canvases', + }, + mode: 'advanced', + }, + { + id: 'canvasListPage', + title: 'Page', + type: 'short-input', + placeholder: '1', + condition: { + field: 'operation', + value: 'list_canvases', + }, + mode: 'advanced', + }, + { + id: 'canvasListUser', + title: 'User ID', + type: 'short-input', + placeholder: 'Optional creator filter (e.g., U1234567890)', + condition: { + field: 'operation', + value: 'list_canvases', + }, + mode: 'advanced', + }, + { + id: 'canvasListTsFrom', + title: 'Created After', + type: 'short-input', + placeholder: 'Unix timestamp (e.g., 123456789)', + condition: { + field: 'operation', + value: 'list_canvases', + }, + mode: 'advanced', + }, + { + id: 'canvasListTsTo', + title: 'Created Before', + type: 'short-input', + placeholder: 'Unix timestamp (e.g., 123456789)', + condition: { + field: 'operation', + value: 'list_canvases', + }, + mode: 'advanced', + }, + { + id: 'canvasListTeamId', + title: 'Team ID', + type: 'short-input', + placeholder: 'Encoded team ID (org tokens only)', + condition: { + field: 'operation', + value: 'list_canvases', + }, + mode: 'advanced', + }, + // Lookup Canvas Sections specific fields + { + id: 'lookupCanvasId', + title: 'Canvas ID', + type: 'short-input', + placeholder: 'Enter canvas ID (e.g., F1234ABCD)', + condition: { + field: 'operation', + value: 'lookup_canvas_sections', + }, + required: true, + }, + { + id: 'sectionCriteria', + title: 'Section Criteria', + type: 'code', + language: 'json', + placeholder: '{"section_types":["h1"],"contains_text":"Roadmap"}', + condition: { + field: 'operation', + value: 'lookup_canvas_sections', + }, + required: true, + }, + // Delete Canvas specific fields + { + id: 'deleteCanvasId', + title: 'Canvas ID', + type: 'short-input', + placeholder: 'Enter canvas ID (e.g., F1234ABCD)', + condition: { + field: 'operation', + value: 'delete_canvas', + }, + required: true, + }, // Create Conversation specific fields { id: 'conversationName', @@ -1058,6 +1191,10 @@ Do not include any explanations, markdown formatting, or other text outside the 'slack_get_user_presence', 'slack_edit_canvas', 'slack_create_channel_canvas', + 'slack_get_canvas', + 'slack_list_canvases', + 'slack_lookup_canvas_sections', + 'slack_delete_canvas', 'slack_create_conversation', 'slack_invite_to_conversation', 'slack_open_view', @@ -1106,6 +1243,14 @@ Do not include any explanations, markdown formatting, or other text outside the return 'slack_edit_canvas' case 'create_channel_canvas': return 'slack_create_channel_canvas' + case 'get_canvas': + return 'slack_get_canvas' + case 'list_canvases': + return 'slack_list_canvases' + case 'lookup_canvas_sections': + return 'slack_lookup_canvas_sections' + case 'delete_canvas': + return 'slack_delete_canvas' case 'create_conversation': return 'slack_create_conversation' case 'invite_to_conversation': @@ -1164,6 +1309,16 @@ Do not include any explanations, markdown formatting, or other text outside the canvasTitle, channelCanvasTitle, channelCanvasContent, + getCanvasId, + canvasListCount, + canvasListPage, + canvasListUser, + canvasListTsFrom, + canvasListTsTo, + canvasListTeamId, + lookupCanvasId, + sectionCriteria, + deleteCanvasId, conversationName, isPrivate, teamId, @@ -1343,6 +1498,46 @@ Do not include any explanations, markdown formatting, or other text outside the } break + case 'get_canvas': + baseParams.canvasId = getCanvasId + break + + case 'list_canvases': + if (canvasListCount) { + const parsedCount = Number.parseInt(canvasListCount, 10) + if (!Number.isNaN(parsedCount) && parsedCount > 0) { + baseParams.count = parsedCount + } + } + if (canvasListPage) { + const parsedPage = Number.parseInt(canvasListPage, 10) + if (!Number.isNaN(parsedPage) && parsedPage > 0) { + baseParams.page = parsedPage + } + } + if (canvasListUser) { + baseParams.user = String(canvasListUser).trim() + } + if (canvasListTsFrom) { + baseParams.tsFrom = String(canvasListTsFrom).trim() + } + if (canvasListTsTo) { + baseParams.tsTo = String(canvasListTsTo).trim() + } + if (canvasListTeamId) { + baseParams.teamId = String(canvasListTeamId).trim() + } + break + + case 'lookup_canvas_sections': + baseParams.canvasId = lookupCanvasId + baseParams.criteria = sectionCriteria + break + + case 'delete_canvas': + baseParams.canvasId = deleteCanvasId + break + case 'create_conversation': baseParams.name = conversationName baseParams.isPrivate = isPrivate === 'true' @@ -1461,6 +1656,23 @@ Do not include any explanations, markdown formatting, or other text outside the // Create Channel Canvas inputs channelCanvasTitle: { type: 'string', description: 'Title for channel canvas' }, channelCanvasContent: { type: 'string', description: 'Content for channel canvas' }, + // Canvas management inputs + getCanvasId: { type: 'string', description: 'Canvas ID to retrieve' }, + canvasListCount: { type: 'string', description: 'Maximum number of canvases to return' }, + canvasListPage: { type: 'string', description: 'Canvas list page number' }, + canvasListUser: { type: 'string', description: 'Optional canvas creator user filter' }, + canvasListTsFrom: { + type: 'string', + description: 'Filter canvases created after this timestamp', + }, + canvasListTsTo: { + type: 'string', + description: 'Filter canvases created before this timestamp', + }, + canvasListTeamId: { type: 'string', description: 'Encoded team ID for org tokens' }, + lookupCanvasId: { type: 'string', description: 'Canvas ID to search for sections' }, + sectionCriteria: { type: 'json', description: 'Canvas section lookup criteria' }, + deleteCanvasId: { type: 'string', description: 'Canvas ID to delete' }, // Create Conversation inputs conversationName: { type: 'string', description: 'Name for the new channel' }, isPrivate: { type: 'string', description: 'Create as private channel (true/false)' }, @@ -1511,6 +1723,26 @@ Do not include any explanations, markdown formatting, or other text outside the // slack_canvas outputs canvas_id: { type: 'string', description: 'Canvas identifier for created canvases' }, title: { type: 'string', description: 'Canvas title' }, + canvas: { + type: 'json', + description: 'Canvas file metadata returned by Slack', + }, + canvases: { + type: 'json', + description: 'Array of canvas file objects returned by Slack', + }, + paging: { + type: 'json', + description: 'Pagination information for listed canvases', + }, + sections: { + type: 'json', + description: 'Canvas section IDs returned by Slack section lookup', + }, + ok: { + type: 'boolean', + description: 'Whether Slack completed the canvas operation successfully', + }, // slack_message_reader outputs (read operation) messages: { diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 170fc805acc..05019d953e9 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -34,6 +34,7 @@ import { type ExecutionContext, getNextExecutionOrder, type NormalizedBlockOutput, + type StreamingExecution, } from '@/executor/types' import { streamingResponseFormatProcessor } from '@/executor/utils' import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' @@ -140,7 +141,7 @@ export class BlockExecutor { let normalizedOutput: NormalizedBlockOutput if (isStreamingExecution) { - const streamingExec = output as { stream: ReadableStream; execution: any } + const streamingExec = output as StreamingExecution if (ctx.onStream) { await this.handleStreamingExecution( @@ -602,7 +603,7 @@ export class BlockExecutor { ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, - streamingExec: { stream: ReadableStream; execution: any }, + streamingExec: StreamingExecution, resolvedInputs: Record, selectedOutputs: string[] ): Promise { @@ -613,56 +614,39 @@ export class BlockExecutor { (block.config?.params as Record | undefined)?.responseFormat ?? (block.config as Record | undefined)?.responseFormat - const stream = streamingExec.stream - if (typeof stream.tee !== 'function') { - await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs) - return - } + const sourceReader = streamingExec.stream.getReader() + const decoder = new TextDecoder() + const accumulated: string[] = [] + let drainError: unknown + let sourceFullyDrained = false - const [clientStream, executorStream] = stream.tee() + const clientSource = new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await sourceReader.read() + if (done) { + const tail = decoder.decode() + if (tail) accumulated.push(tail) + sourceFullyDrained = true + controller.close() + return + } + accumulated.push(decoder.decode(value, { stream: true })) + controller.enqueue(value) + } catch (error) { + drainError = error + controller.error(error) + } + }, + async cancel(reason) { + try { + await sourceReader.cancel(reason) + } catch {} + }, + }) const processedClientStream = streamingResponseFormatProcessor.processStream( - clientStream, - blockId, - selectedOutputs, - responseFormat - ) - - const clientStreamingExec = { - ...streamingExec, - stream: processedClientStream, - } - - const executorConsumption = this.consumeExecutorStream( - executorStream, - streamingExec, - blockId, - responseFormat - ) - - const clientConsumption = (async () => { - try { - await ctx.onStream?.(clientStreamingExec) - } catch (error) { - this.execLogger.error('Error in onStream callback', { blockId, error }) - // Cancel the client stream to release the tee'd buffer - await processedClientStream.cancel().catch(() => {}) - } - })() - - await Promise.all([clientConsumption, executorConsumption]) - } - - private async forwardStream( - ctx: ExecutionContext, - blockId: string, - streamingExec: { stream: ReadableStream; execution: any }, - stream: ReadableStream, - responseFormat: any, - selectedOutputs: string[] - ): Promise { - const processedStream = streamingResponseFormatProcessor.processStream( - stream, + clientSource, blockId, selectedOutputs, responseFormat @@ -670,72 +654,75 @@ export class BlockExecutor { try { await ctx.onStream?.({ - ...streamingExec, - stream: processedStream, + stream: processedClientStream, + execution: streamingExec.execution, }) } catch (error) { this.execLogger.error('Error in onStream callback', { blockId, error }) - await processedStream.cancel().catch(() => {}) - } - } - - private async consumeExecutorStream( - stream: ReadableStream, - streamingExec: { execution: any }, - blockId: string, - responseFormat: any - ): Promise { - const reader = stream.getReader() - const decoder = new TextDecoder() - const chunks: string[] = [] - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - chunks.push(decoder.decode(value, { stream: true })) - } - const tail = decoder.decode() - if (tail) chunks.push(tail) - } catch (error) { - this.execLogger.error('Error reading executor stream for block', { blockId, error }) + await processedClientStream.cancel().catch(() => {}) } finally { try { - await reader.cancel().catch(() => {}) + sourceReader.releaseLock() } catch {} } - const fullContent = chunks.join('') + if (drainError) { + this.execLogger.error('Error reading stream for block', { blockId, error: drainError }) + return + } + + // If the onStream consumer exited before the source drained (e.g. it caught + // an internal error and returned normally), `accumulated` holds a truncated + // response. Persisting that to memory or setting it as the block output + // would corrupt downstream state — skip and log instead. + if (!sourceFullyDrained) { + this.execLogger.warn( + 'Stream consumer exited before source drained; skipping content persistence', + { + blockId, + } + ) + return + } + + const fullContent = accumulated.join('') if (!fullContent) { return } const executionOutput = streamingExec.execution?.output - if (!executionOutput || typeof executionOutput !== 'object') { - return + if (executionOutput && typeof executionOutput === 'object') { + let parsedForFormat = false + if (responseFormat) { + try { + const parsed = JSON.parse(fullContent.trim()) + streamingExec.execution.output = { + ...parsed, + tokens: executionOutput.tokens, + toolCalls: executionOutput.toolCalls, + providerTiming: executionOutput.providerTiming, + cost: executionOutput.cost, + model: executionOutput.model, + } + parsedForFormat = true + } catch (error) { + this.execLogger.warn('Failed to parse streamed content for response format', { + blockId, + error, + }) + } + } + if (!parsedForFormat) { + executionOutput.content = fullContent + } } - if (responseFormat) { + if (streamingExec.onFullContent) { try { - const parsed = JSON.parse(fullContent.trim()) - - streamingExec.execution.output = { - ...parsed, - tokens: executionOutput.tokens, - toolCalls: executionOutput.toolCalls, - providerTiming: executionOutput.providerTiming, - cost: executionOutput.cost, - model: executionOutput.model, - } - return + await streamingExec.onFullContent(fullContent) } catch (error) { - this.execLogger.warn('Failed to parse streamed content for response format', { - blockId, - error, - }) + this.execLogger.error('onFullContent callback failed', { blockId, error }) } } - - executionOutput.content = fullContent } } diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index 31c43b6582d..4145252fc08 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler { streamingExec: StreamingExecution ): StreamingExecution { return { - stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs), + stream: streamingExec.stream, execution: streamingExec.execution, + onFullContent: async (content: string) => { + if (!content.trim()) return + try { + await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content }) + } catch (error) { + logger.error('Failed to persist streaming response:', error) + } + }, } } diff --git a/apps/sim/executor/handlers/agent/memory.ts b/apps/sim/executor/handlers/agent/memory.ts index 6428f0b7607..fcba3628226 100644 --- a/apps/sim/executor/handlers/agent/memory.ts +++ b/apps/sim/executor/handlers/agent/memory.ts @@ -111,35 +111,6 @@ export class Memory { }) } - wrapStreamForPersistence( - stream: ReadableStream, - ctx: ExecutionContext, - inputs: AgentInputs - ): ReadableStream { - const chunks: string[] = [] - const decoder = new TextDecoder() - - const transformStream = new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue(chunk) - const decoded = decoder.decode(chunk, { stream: true }) - chunks.push(decoded) - }, - - flush: () => { - const content = chunks.join('') - if (content.trim()) { - this.appendToMemory(ctx, inputs, { - role: 'assistant', - content, - }).catch((error) => logger.error('Failed to persist streaming response:', error)) - } - }, - }) - - return stream.pipeThrough(transformStream) - } - private requireWorkspaceId(ctx: ExecutionContext): string { if (!ctx.workspaceId) { throw new Error('workspaceId is required for memory operations') diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 00caff1d9ef..b8b0d4cfa0d 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -359,6 +359,12 @@ export interface ExecutionResult { export interface StreamingExecution { stream: ReadableStream execution: ExecutionResult & { isStreaming?: boolean } + /** + * Invoked with the assembled response text after the stream drains. Lets agent + * blocks persist the full response without interposing a TransformStream on a + * fetch-backed source — that pattern amplifies memory on Bun via #28035. + */ + onFullContent?: (content: string) => void | Promise } export interface BlockExecutor { diff --git a/apps/sim/lib/cleanup/batch-delete.ts b/apps/sim/lib/cleanup/batch-delete.ts index 78d0f8867ad..dbf6cf21c51 100644 --- a/apps/sim/lib/cleanup/batch-delete.ts +++ b/apps/sim/lib/cleanup/batch-delete.ts @@ -7,6 +7,55 @@ const logger = createLogger('BatchDelete') export const DEFAULT_BATCH_SIZE = 2000 export const DEFAULT_MAX_BATCHES_PER_TABLE = 10 +/** + * Split workspaceIds into this-sized groups before running SELECT/DELETE. Large + * IN lists combined with `started_at < X` force Postgres to probe every + * workspace range in the composite index, which blows the 90s statement timeout + * at the scale of the full free tier. + */ +export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50 + +export function chunkArray(arr: T[], size: number): T[][] { + const out: T[][] = [] + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)) + return out +} + +export interface SelectByIdChunksOptions { + /** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */ + overallLimit?: number + chunkSize?: number +} + +/** + * Run a SELECT query once per ID chunk and concatenate results up to + * `overallLimit`. Each chunk's query is passed the remaining row budget so the + * total never exceeds the cap. Use this when you need the selected row set + * (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete). + * + * Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids + * sending one massive `IN (...)` list that would blow Postgres's statement + * timeout. + */ +export async function selectRowsByIdChunks( + ids: string[], + query: (chunkIds: string[], chunkLimit: number) => Promise, + { + overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, + chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, + }: SelectByIdChunksOptions = {} +): Promise { + if (ids.length === 0) return [] + + const rows: T[] = [] + for (const chunkIds of chunkArray(ids, chunkSize)) { + if (rows.length >= overallLimit) break + const remaining = overallLimit - rows.length + const chunkRows = await query(chunkIds, remaining) + rows.push(...chunkRows) + } + return rows +} export interface TableCleanupResult { table: string @@ -14,36 +63,48 @@ export interface TableCleanupResult { failed: number } -export interface BatchDeleteOptions { +export interface ChunkedBatchDeleteOptions { tableDef: PgTable - workspaceIdCol: PgColumn - timestampCol: PgColumn workspaceIds: string[] - retentionDate: Date tableName: string - /** When true, also requires `timestampCol IS NOT NULL` (soft-delete semantics). */ - requireTimestampNotNull?: boolean + /** SELECT eligible rows for one workspace chunk. The result must include `id`. */ + selectChunk: (chunkIds: string[], limit: number) => Promise + /** Runs between SELECT and DELETE; receives the just-selected rows. */ + onBatch?: (rows: TRow[]) => Promise batchSize?: number + /** Max batches per workspace chunk. */ maxBatches?: number + /** + * Hard cap on rows processed (deleted + failed) across all chunks per call. + * Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron + * runs frequently enough to catch up the backlog over multiple invocations. + */ + totalRowLimit?: number + workspaceChunkSize?: number } /** - * Iteratively delete rows in a table matching a workspace + time-based predicate. + * Inner loop primitive for cleanup jobs. * - * Uses a SELECT-with-LIMIT → DELETE-by-ID pattern to keep each round bounded in - * memory and I/O (PostgreSQL DELETE does not support LIMIT directly). + * For each workspace chunk: SELECT a batch of eligible rows → run optional + * `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats + * until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops + * the whole call once `totalRowLimit` rows have been processed. + * + * Workspace IDs are chunked before the SELECT — see + * `DEFAULT_WORKSPACE_CHUNK_SIZE` for why. */ -export async function batchDeleteByWorkspaceAndTimestamp({ +export async function chunkedBatchDelete({ tableDef, - workspaceIdCol, - timestampCol, workspaceIds, - retentionDate, tableName, - requireTimestampNotNull = false, + selectChunk, + onBatch, batchSize = DEFAULT_BATCH_SIZE, maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE, -}: BatchDeleteOptions): Promise { + totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE, + workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE, +}: ChunkedBatchDeleteOptions): Promise { const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 } if (workspaceIds.length === 0) { @@ -51,48 +112,108 @@ export async function batchDeleteByWorkspaceAndTimestamp({ return result } - const predicates = [inArray(workspaceIdCol, workspaceIds), lt(timestampCol, retentionDate)] - if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol)) - const whereClause = and(...predicates) + const chunks = chunkArray(workspaceIds, workspaceChunkSize) + let stoppedEarly = false + + for (const [chunkIdx, chunkIds] of chunks.entries()) { + if (result.deleted + result.failed >= totalRowLimit) { + stoppedEarly = true + break + } - let batchesProcessed = 0 - let hasMore = true + let batchesProcessed = 0 + let hasMore = true - while (hasMore && batchesProcessed < maxBatches) { - try { - const batch = await db - .select({ id: sql`id` }) - .from(tableDef) - .where(whereClause) - .limit(batchSize) + while ( + hasMore && + batchesProcessed < maxBatches && + result.deleted + result.failed < totalRowLimit + ) { + let rows: TRow[] = [] + try { + rows = await selectChunk(chunkIds, batchSize) + + if (rows.length === 0) { + hasMore = false + break + } - if (batch.length === 0) { - logger.info(`[${tableName}] No expired rows found`) + if (onBatch) await onBatch(rows) + + const ids = rows.map((r) => r.id) + const deleted = await db + .delete(tableDef) + .where(inArray(sql`id`, ids)) + .returning({ id: sql`id` }) + + result.deleted += deleted.length + hasMore = rows.length === batchSize + batchesProcessed++ + } catch (error) { + // Count rows we tried to delete; SELECT-stage errors leave rows=[]. + result.failed += rows.length + logger.error( + `[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`, + { error } + ) hasMore = false - break } - - const ids = batch.map((r) => r.id) - const deleted = await db - .delete(tableDef) - .where(inArray(sql`id`, ids)) - .returning({ id: sql`id` }) - - result.deleted += deleted.length - hasMore = batch.length === batchSize - batchesProcessed++ - - logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`) - } catch (error) { - result.failed++ - logger.error(`[${tableName}] Batch delete failed:`, { error }) - hasMore = false } } + logger.info( + `[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}` + ) + return result } +export interface BatchDeleteOptions { + tableDef: PgTable + workspaceIdCol: PgColumn + timestampCol: PgColumn + workspaceIds: string[] + retentionDate: Date + tableName: string + /** When true, also requires `timestampCol IS NOT NULL` (soft-delete semantics). */ + requireTimestampNotNull?: boolean + batchSize?: number + maxBatches?: number + workspaceChunkSize?: number +} + +/** + * Convenience wrapper around `chunkedBatchDelete` for the common case: delete + * rows where `workspaceId IN (...) AND timestamp < retentionDate`. Use this + * when there's no per-row side effect (e.g. no S3 files to clean up alongside). + */ +export async function batchDeleteByWorkspaceAndTimestamp({ + tableDef, + workspaceIdCol, + timestampCol, + workspaceIds, + retentionDate, + tableName, + requireTimestampNotNull = false, + ...rest +}: BatchDeleteOptions): Promise { + return chunkedBatchDelete({ + tableDef, + workspaceIds, + tableName, + selectChunk: (chunkIds, limit) => { + const predicates = [inArray(workspaceIdCol, chunkIds), lt(timestampCol, retentionDate)] + if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol)) + return db + .select({ id: sql`id` }) + .from(tableDef) + .where(and(...predicates)) + .limit(limit) + }, + ...rest, + }) +} + /** * Delete rows by an explicit list of IDs. Use this when the IDs were selected * upstream (e.g., to drive external cleanup like S3 deletes or a backend API diff --git a/apps/sim/lib/copilot/tools/handlers/management/manage-credential.ts b/apps/sim/lib/copilot/tools/handlers/management/manage-credential.ts index 03422859dcc..5f5f4574747 100644 --- a/apps/sim/lib/copilot/tools/handlers/management/manage-credential.ts +++ b/apps/sim/lib/copilot/tools/handlers/management/manage-credential.ts @@ -3,10 +3,11 @@ import { credential } from '@sim/db/schema' import { toError } from '@sim/utils/errors' import { eq } from 'drizzle-orm' import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types' +import { getCredentialActorContext } from '@/lib/credentials/access' export function executeManageCredential( rawParams: Record, - _context: ExecutionContext + context: ExecutionContext ): Promise { const params = rawParams as { operation: string @@ -17,26 +18,30 @@ export function executeManageCredential( const { operation, displayName } = params return (async () => { try { + if (!context?.userId) { + return { success: false, error: 'Authentication required' } + } + switch (operation) { case 'rename': { const credentialId = params.credentialId if (!credentialId) return { success: false, error: 'credentialId is required for rename' } if (!displayName) return { success: false, error: 'displayName is required for rename' } - const [row] = await db - .select({ - id: credential.id, - type: credential.type, - displayName: credential.displayName, - }) - .from(credential) - .where(eq(credential.id, credentialId)) - .limit(1) - if (!row) return { success: false, error: 'Credential not found' } - if (row.type !== 'oauth') + + const actor = await getCredentialActorContext(credentialId, context.userId) + if (!actor.credential || !actor.hasWorkspaceAccess) { + return { success: false, error: 'Credential not found' } + } + if (actor.credential.type !== 'oauth') { return { success: false, error: 'Only OAuth credentials can be managed with this tool.', } + } + if (!actor.canWriteWorkspace && !actor.isAdmin) { + return { success: false, error: 'Write access required to rename this credential' } + } + await db .update(credential) .set({ displayName, updatedAt: new Date() }) @@ -53,12 +58,16 @@ export function executeManageCredential( const failed: string[] = [] for (const id of ids) { - const [row] = await db - .select({ id: credential.id, type: credential.type }) - .from(credential) - .where(eq(credential.id, id)) - .limit(1) - if (!row || row.type !== 'oauth') { + const actor = await getCredentialActorContext(id, context.userId) + if ( + !actor.credential || + !actor.hasWorkspaceAccess || + actor.credential.type !== 'oauth' + ) { + failed.push(id) + continue + } + if (!actor.canWriteWorkspace && !actor.isAdmin) { failed.push(id) continue } diff --git a/apps/sim/lib/copilot/tools/handlers/restore-resource.ts b/apps/sim/lib/copilot/tools/handlers/restore-resource.ts index bbe90f76cfc..9062d9b1352 100644 --- a/apps/sim/lib/copilot/tools/handlers/restore-resource.ts +++ b/apps/sim/lib/copilot/tools/handlers/restore-resource.ts @@ -1,6 +1,9 @@ +import { db } from '@sim/db' +import { knowledgeBase } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' +import { eq } from 'drizzle-orm' import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types' import { restoreKnowledgeBase } from '@/lib/knowledge/service' import { getTableById, restoreTable } from '@/lib/table/service' @@ -10,6 +13,8 @@ import { } from '@/lib/uploads/contexts/workspace/workspace-file-manager' import { restoreWorkflow } from '@/lib/workflows/lifecycle' import { performRestoreFolder } from '@/lib/workflows/orchestration/folder-lifecycle' +import { getWorkflowById } from '@/lib/workflows/utils' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('RestoreResource') @@ -33,10 +38,25 @@ export async function executeRestoreResource( } const requestId = generateId().slice(0, 8) + const callerWorkspaceId = context.workspaceId + + const hasWriteAccess = async (resourceWorkspaceId: string | null | undefined) => { + if (!resourceWorkspaceId || resourceWorkspaceId !== callerWorkspaceId) return false + const permission = await getUserEntityPermissions( + context.userId, + 'workspace', + resourceWorkspaceId + ) + return permission === 'write' || permission === 'admin' + } try { switch (type) { case 'workflow': { + const existing = await getWorkflowById(id, { includeArchived: true }) + if (!existing || !(await hasWriteAccess(existing.workspaceId))) { + return { success: false, error: 'Workflow not found' } + } const result = await restoreWorkflow(id, { requestId }) if (!result.restored) { return { success: false, error: 'Workflow not found or not archived' } @@ -50,9 +70,13 @@ export async function executeRestoreResource( } case 'table': { + const existing = await getTableById(id, { includeArchived: true }) + if (!existing || !(await hasWriteAccess(existing.workspaceId))) { + return { success: false, error: 'Table not found' } + } await restoreTable(id, requestId) const table = await getTableById(id) - const tableName = table?.name || id + const tableName = table?.name || existing.name logger.info('Table restored via copilot', { tableId: id, name: tableName }) return { success: true, @@ -62,6 +86,9 @@ export async function executeRestoreResource( } case 'file': { + if (!(await hasWriteAccess(context.workspaceId))) { + return { success: false, error: 'File not found' } + } await restoreWorkspaceFile(context.workspaceId, id) const fileRecord = await getWorkspaceFile(context.workspaceId, id) const fileName = fileRecord?.name || id @@ -74,6 +101,14 @@ export async function executeRestoreResource( } case 'knowledgebase': { + const [existing] = await db + .select({ workspaceId: knowledgeBase.workspaceId }) + .from(knowledgeBase) + .where(eq(knowledgeBase.id, id)) + .limit(1) + if (!existing || !(await hasWriteAccess(existing.workspaceId))) { + return { success: false, error: 'Knowledge base not found' } + } await restoreKnowledgeBase(id, requestId) logger.info('Knowledge base restored via copilot', { knowledgeBaseId: id }) return { @@ -83,6 +118,9 @@ export async function executeRestoreResource( } case 'folder': { + if (!(await hasWriteAccess(context.workspaceId))) { + return { success: false, error: 'Folder not found' } + } const result = await performRestoreFolder({ folderId: id, workspaceId: context.workspaceId, diff --git a/apps/sim/lib/copilot/tools/handlers/workflow/mutations.ts b/apps/sim/lib/copilot/tools/handlers/workflow/mutations.ts index 7e1d8281830..f70d5ff64bf 100644 --- a/apps/sim/lib/copilot/tools/handlers/workflow/mutations.ts +++ b/apps/sim/lib/copilot/tools/handlers/workflow/mutations.ts @@ -28,6 +28,7 @@ import { setWorkflowVariables, updateFolderRecord, updateWorkflowRecord, + verifyFolderWorkspace, } from '@/lib/workflows/utils' import { hasExecutionResult } from '@/executor/utils/errors' import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types' @@ -522,7 +523,13 @@ export async function executeMoveWorkflow( for (const workflowId of workflowIds) { try { - await ensureWorkflowAccess(workflowId, context.userId, 'write') + const { workspaceId } = await ensureWorkflowAccess(workflowId, context.userId, 'write') + if (folderId) { + if (!workspaceId || !(await verifyFolderWorkspace(folderId, workspaceId))) { + failed.push(workflowId) + continue + } + } assertWorkflowMutationNotAborted(context) await updateWorkflowRecord(workflowId, { folderId }) moved.push(workflowId) @@ -562,6 +569,14 @@ export async function executeMoveFolder( const workspaceId = context.workspaceId || (await getDefaultWorkspaceId(context.userId)) await ensureWorkspaceAccess(workspaceId, context.userId, 'write') + + if (!(await verifyFolderWorkspace(folderId, workspaceId))) { + return { success: false, error: 'Folder not found' } + } + if (parentId && !(await verifyFolderWorkspace(parentId, workspaceId))) { + return { success: false, error: 'Parent folder not found' } + } + assertWorkflowMutationNotAborted(context) await updateFolderRecord(folderId, { parentId }) @@ -1007,6 +1022,11 @@ export async function executeRenameFolder( const workspaceId = context.workspaceId || (await getDefaultWorkspaceId(context.userId)) await ensureWorkspaceAccess(workspaceId, context.userId, 'write') + + if (!(await verifyFolderWorkspace(folderId, workspaceId))) { + return { success: false, error: 'Folder not found' } + } + assertWorkflowMutationNotAborted(context) await updateFolderRecord(folderId, { name }) diff --git a/apps/sim/lib/copilot/tools/server/jobs/get-job-logs.ts b/apps/sim/lib/copilot/tools/server/jobs/get-job-logs.ts index 90ed8fbbe51..2e222ed4a84 100644 --- a/apps/sim/lib/copilot/tools/server/jobs/get-job-logs.ts +++ b/apps/sim/lib/copilot/tools/server/jobs/get-job-logs.ts @@ -105,11 +105,12 @@ export const getJobLogsServerTool: BaseServerTool } const wsId = workspaceId || context.workspaceId - if (wsId) { - const access = await checkWorkspaceAccess(wsId, context.userId) - if (!access.hasAccess) { - throw new Error('Unauthorized workspace access') - } + if (!wsId) { + throw new Error('Workspace context required') + } + const access = await checkWorkspaceAccess(wsId, context.userId) + if (!access.hasAccess) { + throw new Error('Unauthorized workspace access') } const clampedLimit = Math.min(Math.max(1, limit), 5) @@ -121,7 +122,10 @@ export const getJobLogsServerTool: BaseServerTool includeDetails, }) - const conditions = [eq(jobExecutionLogs.scheduleId, jobId)] + const conditions = [ + eq(jobExecutionLogs.scheduleId, jobId), + eq(jobExecutionLogs.workspaceId, wsId), + ] if (executionId) { conditions.push(eq(jobExecutionLogs.executionId, executionId)) } diff --git a/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts b/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts index 64d6561273b..46d6062eb93 100644 --- a/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts +++ b/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts @@ -37,6 +37,11 @@ import { import { StorageService } from '@/lib/uploads' import { resolveWorkspaceFileReference } from '@/lib/uploads/contexts/workspace/workspace-file-manager' import { getQueryStrategy, handleVectorOnlySearch } from '@/app/api/knowledge/search/utils' +import { + checkDocumentWriteAccess, + checkKnowledgeBaseAccess, + checkKnowledgeBaseWriteAccess, +} from '@/app/api/knowledge/utils' const logger = createLogger('KnowledgeBaseServerTool') @@ -141,6 +146,14 @@ export const knowledgeBaseServerTool: BaseServerTool = { connectorType: args.connectorType, sourceConfig: args.sourceConfig ?? {}, @@ -762,6 +892,11 @@ export const knowledgeBaseServerTool: BaseServerTool = {} if (args.sourceConfig !== undefined) updateBody.sourceConfig = args.sourceConfig if (args.syncIntervalMinutes !== undefined) @@ -810,6 +945,11 @@ export const knowledgeBaseServerTool: BaseServerTool if (!args.tableId) { return { success: false, message: 'Table ID is required' } } + if (!workspaceId) { + return { success: false, message: 'Workspace ID is required' } + } const table = await getTableById(args.tableId) - if (!table) { + if (!table || table.workspaceId !== workspaceId) { return { success: false, message: `Table not found: ${args.tableId}` } } @@ -240,9 +243,12 @@ export const userTableServerTool: BaseServerTool if (!args.tableId) { return { success: false, message: 'Table ID is required' } } + if (!workspaceId) { + return { success: false, message: 'Workspace ID is required' } + } const table = await getTableById(args.tableId) - if (!table) { + if (!table || table.workspaceId !== workspaceId) { return { success: false, message: `Table not found: ${args.tableId}` } } @@ -816,6 +822,9 @@ export const userTableServerTool: BaseServerTool if (!args.tableId) { return { success: false, message: 'Table ID is required' } } + if (!workspaceId) { + return { success: false, message: 'Workspace ID is required' } + } const col = (args as Record).column as | { name: string @@ -830,6 +839,10 @@ export const userTableServerTool: BaseServerTool message: 'column with name and type is required for add_column', } } + const tableForAdd = await getTableById(args.tableId) + if (!tableForAdd || tableForAdd.workspaceId !== workspaceId) { + return { success: false, message: `Table not found: ${args.tableId}` } + } const requestId = generateId().slice(0, 8) assertNotAborted() const updated = await addTableColumn(args.tableId, col, requestId) @@ -844,11 +857,18 @@ export const userTableServerTool: BaseServerTool if (!args.tableId) { return { success: false, message: 'Table ID is required' } } + if (!workspaceId) { + return { success: false, message: 'Workspace ID is required' } + } const colName = (args as Record).columnName as string | undefined const newColName = (args as Record).newName as string | undefined if (!colName || !newColName) { return { success: false, message: 'columnName and newName are required' } } + const tableForRename = await getTableById(args.tableId) + if (!tableForRename || tableForRename.workspaceId !== workspaceId) { + return { success: false, message: `Table not found: ${args.tableId}` } + } const requestId = generateId().slice(0, 8) assertNotAborted() const updated = await renameColumn( @@ -866,12 +886,19 @@ export const userTableServerTool: BaseServerTool if (!args.tableId) { return { success: false, message: 'Table ID is required' } } + if (!workspaceId) { + return { success: false, message: 'Workspace ID is required' } + } const colName = (args as Record).columnName as string | undefined const colNames = (args as Record).columnNames as string[] | undefined const names = colNames ?? (colName ? [colName] : null) if (!names || names.length === 0) { return { success: false, message: 'columnName or columnNames is required' } } + const tableForDelete = await getTableById(args.tableId) + if (!tableForDelete || tableForDelete.workspaceId !== workspaceId) { + return { success: false, message: `Table not found: ${args.tableId}` } + } const requestId = generateId().slice(0, 8) if (names.length === 1) { assertNotAborted() @@ -901,6 +928,9 @@ export const userTableServerTool: BaseServerTool if (!args.tableId) { return { success: false, message: 'Table ID is required' } } + if (!workspaceId) { + return { success: false, message: 'Workspace ID is required' } + } const colName = (args as Record).columnName as string | undefined if (!colName) { return { success: false, message: 'columnName is required' } @@ -913,6 +943,10 @@ export const userTableServerTool: BaseServerTool message: 'At least one of newType or unique must be provided', } } + const tableForUpdate = await getTableById(args.tableId) + if (!tableForUpdate || tableForUpdate.workspaceId !== workspaceId) { + return { success: false, message: `Table not found: ${args.tableId}` } + } const requestId = generateId().slice(0, 8) let result: TableDefinition | undefined if (newType !== undefined) { diff --git a/apps/sim/lib/oauth/oauth.ts b/apps/sim/lib/oauth/oauth.ts index 65afa3a30a8..9f12b4c3f60 100644 --- a/apps/sim/lib/oauth/oauth.ts +++ b/apps/sim/lib/oauth/oauth.ts @@ -704,7 +704,7 @@ export const OAUTH_PROVIDERS: Record = { services: { slack: { name: 'Slack', - description: 'Send messages using a bot for Slack.', + description: 'Use Slack messaging, files, reactions, views, and canvases.', providerId: 'slack', icon: SlackIcon, baseProviderIcon: SlackIcon, @@ -722,6 +722,7 @@ export const OAUTH_PROVIDERS: Record = { // TODO: Add 'users:read.email' once Slack app review is approved 'files:write', 'files:read', + 'canvases:read', 'canvases:write', 'reactions:write', ], diff --git a/apps/sim/lib/oauth/utils.ts b/apps/sim/lib/oauth/utils.ts index bd6173b2b90..5db26dbc0d1 100644 --- a/apps/sim/lib/oauth/utils.ts +++ b/apps/sim/lib/oauth/utils.ts @@ -278,7 +278,8 @@ export const SCOPE_DESCRIPTIONS: Record = { 'users:read.email': 'View user email addresses', 'files:write': 'Upload files', 'files:read': 'Download and read files', - 'canvases:write': 'Create canvas documents', + 'canvases:read': 'Read canvas sections', + 'canvases:write': 'Create, edit, and delete canvas documents', 'reactions:write': 'Add emoji reactions to messages', // Webflow scopes diff --git a/apps/sim/lib/uploads/core/upload-token.ts b/apps/sim/lib/uploads/core/upload-token.ts new file mode 100644 index 00000000000..b6890431bcf --- /dev/null +++ b/apps/sim/lib/uploads/core/upload-token.ts @@ -0,0 +1,87 @@ +import { safeCompare } from '@sim/security/compare' +import { hmacSha256Base64 } from '@sim/security/hmac' +import { env } from '@/lib/core/config/env' +import type { StorageContext } from '@/lib/uploads/shared/types' + +export interface UploadTokenPayload { + uploadId: string + key: string + userId: string + workspaceId: string + context: StorageContext +} + +interface SignedPayload extends UploadTokenPayload { + exp: number + v: 1 +} + +const toBase64Url = (input: string): string => Buffer.from(input, 'utf8').toString('base64url') + +const fromBase64Url = (input: string): string => Buffer.from(input, 'base64url').toString('utf8') + +const sign = (payload: string): string => hmacSha256Base64(payload, env.INTERNAL_API_SECRET) + +/** + * Sign an upload session token binding (uploadId, key, userId, workspaceId, context). + * Used to prevent IDOR on multipart upload follow-up calls (get-part-urls, complete, abort). + */ +export function signUploadToken(payload: UploadTokenPayload, expiresInSeconds = 60 * 60): string { + const signed: SignedPayload = { + ...payload, + exp: Math.floor(Date.now() / 1000) + expiresInSeconds, + v: 1, + } + const encoded = toBase64Url(JSON.stringify(signed)) + return `${encoded}.${sign(encoded)}` +} + +export type UploadTokenVerification = + | { valid: true; payload: UploadTokenPayload } + | { valid: false } + +export function verifyUploadToken(token: string): UploadTokenVerification { + if (typeof token !== 'string') { + return { valid: false } + } + const parts = token.split('.') + if (parts.length !== 2) return { valid: false } + const [encoded, signature] = parts + if (!encoded || !signature) return { valid: false } + + const expected = sign(encoded) + if (!safeCompare(signature, expected)) { + return { valid: false } + } + + let parsed: SignedPayload + try { + parsed = JSON.parse(fromBase64Url(encoded)) as SignedPayload + } catch { + return { valid: false } + } + + if ( + parsed.v !== 1 || + typeof parsed.exp !== 'number' || + parsed.exp < Math.floor(Date.now() / 1000) || + typeof parsed.uploadId !== 'string' || + typeof parsed.key !== 'string' || + typeof parsed.userId !== 'string' || + typeof parsed.workspaceId !== 'string' || + typeof parsed.context !== 'string' + ) { + return { valid: false } + } + + return { + valid: true, + payload: { + uploadId: parsed.uploadId, + key: parsed.key, + userId: parsed.userId, + workspaceId: parsed.workspaceId, + context: parsed.context as StorageContext, + }, + } +} diff --git a/apps/sim/lib/workflows/utils.ts b/apps/sim/lib/workflows/utils.ts index a6b971f932a..318d6249d6a 100644 --- a/apps/sim/lib/workflows/utils.ts +++ b/apps/sim/lib/workflows/utils.ts @@ -564,6 +564,18 @@ export async function updateFolderRecord( await db.update(workflowFolder).set(setData).where(eq(workflowFolder.id, folderId)) } +export async function verifyFolderWorkspace( + folderId: string, + workspaceId: string +): Promise { + const [row] = await db + .select({ id: workflowFolder.id }) + .from(workflowFolder) + .where(and(eq(workflowFolder.id, folderId), eq(workflowFolder.workspaceId, workspaceId))) + .limit(1) + return Boolean(row) +} + export async function deleteFolderRecord(folderId: string): Promise { const [folder] = await db .select({ parentId: workflowFolder.parentId }) diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index 12cdda7d229..1d58ccebdc4 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -2364,19 +2364,23 @@ import { slackCanvasTool, slackCreateChannelCanvasTool, slackCreateConversationTool, + slackDeleteCanvasTool, slackDeleteMessageTool, slackDownloadTool, slackEditCanvasTool, slackEphemeralMessageTool, + slackGetCanvasTool, slackGetChannelInfoTool, slackGetMessageTool, slackGetThreadTool, slackGetUserPresenceTool, slackGetUserTool, slackInviteToConversationTool, + slackListCanvasesTool, slackListChannelsTool, slackListMembersTool, slackListUsersTool, + slackLookupCanvasSectionsTool, slackMessageReaderTool, slackMessageTool, slackOpenViewTool, @@ -3360,6 +3364,10 @@ export const tools: Record = { slack_publish_view: slackPublishViewTool, slack_edit_canvas: slackEditCanvasTool, slack_create_channel_canvas: slackCreateChannelCanvasTool, + slack_get_canvas: slackGetCanvasTool, + slack_list_canvases: slackListCanvasesTool, + slack_lookup_canvas_sections: slackLookupCanvasSectionsTool, + slack_delete_canvas: slackDeleteCanvasTool, slack_create_conversation: slackCreateConversationTool, slack_invite_to_conversation: slackInviteToConversationTool, github_repo_info: githubRepoInfoTool, diff --git a/apps/sim/tools/slack/delete_canvas.ts b/apps/sim/tools/slack/delete_canvas.ts new file mode 100644 index 00000000000..903f38dbb47 --- /dev/null +++ b/apps/sim/tools/slack/delete_canvas.ts @@ -0,0 +1,79 @@ +import type { SlackDeleteCanvasParams, SlackDeleteCanvasResponse } from '@/tools/slack/types' +import type { ToolConfig } from '@/tools/types' + +export const slackDeleteCanvasTool: ToolConfig = + { + id: 'slack_delete_canvas', + name: 'Slack Delete Canvas', + description: 'Delete a Slack canvas by its canvas ID', + version: '1.0.0', + + oauth: { + required: true, + provider: 'slack', + }, + + params: { + authMethod: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Authentication method: oauth or bot_token', + }, + botToken: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Bot token for Custom Bot', + }, + accessToken: { + type: 'string', + required: false, + visibility: 'hidden', + description: 'OAuth access token or bot token for Slack API', + }, + canvasId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Canvas ID to delete (e.g., F1234ABCD)', + }, + }, + + request: { + url: 'https://slack.com/api/canvases.delete', + method: 'POST', + headers: (params: SlackDeleteCanvasParams) => ({ + 'Content-Type': 'application/json', + Authorization: `Bearer ${params.accessToken || params.botToken}`, + }), + body: (params: SlackDeleteCanvasParams) => ({ + canvas_id: params.canvasId.trim(), + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!data.ok) { + if (data.error === 'canvas_not_found') { + throw new Error('Canvas not found or not visible to the authenticated Slack user or bot.') + } + if (data.error === 'canvas_deleting_disabled') { + throw new Error('Canvas deletion is disabled for this workspace.') + } + throw new Error(data.error || 'Failed to delete canvas') + } + + return { + success: true, + output: { + ok: data.ok, + }, + } + }, + + outputs: { + ok: { type: 'boolean', description: 'Whether Slack deleted the canvas successfully' }, + }, + } diff --git a/apps/sim/tools/slack/get_canvas.ts b/apps/sim/tools/slack/get_canvas.ts new file mode 100644 index 00000000000..f4bcc97f847 --- /dev/null +++ b/apps/sim/tools/slack/get_canvas.ts @@ -0,0 +1,85 @@ +import type { SlackGetCanvasParams, SlackGetCanvasResponse } from '@/tools/slack/types' +import { CANVAS_FILE_OUTPUT_PROPERTIES } from '@/tools/slack/types' +import { mapCanvasFile } from '@/tools/slack/utils' +import type { ToolConfig } from '@/tools/types' + +export const slackGetCanvasTool: ToolConfig = { + id: 'slack_get_canvas', + name: 'Slack Get Canvas Info', + description: 'Get Slack canvas file metadata by canvas ID', + version: '1.0.0', + + oauth: { + required: true, + provider: 'slack', + }, + + params: { + authMethod: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Authentication method: oauth or bot_token', + }, + botToken: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Bot token for Custom Bot', + }, + accessToken: { + type: 'string', + required: false, + visibility: 'hidden', + description: 'OAuth access token or bot token for Slack API', + }, + canvasId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Canvas file ID to retrieve (e.g., F1234ABCD)', + }, + }, + + request: { + url: (params: SlackGetCanvasParams) => { + const url = new URL('https://slack.com/api/files.info') + url.searchParams.append('file', params.canvasId.trim()) + return url.toString() + }, + method: 'GET', + headers: (params: SlackGetCanvasParams) => ({ + 'Content-Type': 'application/json', + Authorization: `Bearer ${params.accessToken || params.botToken}`, + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!data.ok) { + if (data.error === 'file_not_found') { + throw new Error('Canvas not found. Please check the canvas ID and try again.') + } + if (data.error === 'not_visible') { + throw new Error('Canvas is not visible to the authenticated Slack user or bot.') + } + throw new Error(data.error || 'Failed to get canvas from Slack') + } + + return { + success: true, + output: { + canvas: mapCanvasFile(data.file), + }, + } + }, + + outputs: { + canvas: { + type: 'object', + description: 'Canvas file information returned by Slack', + properties: CANVAS_FILE_OUTPUT_PROPERTIES, + }, + }, +} diff --git a/apps/sim/tools/slack/index.ts b/apps/sim/tools/slack/index.ts index 6ea056ac877..9a075e6db9b 100644 --- a/apps/sim/tools/slack/index.ts +++ b/apps/sim/tools/slack/index.ts @@ -2,19 +2,23 @@ import { slackAddReactionTool } from '@/tools/slack/add_reaction' import { slackCanvasTool } from '@/tools/slack/canvas' import { slackCreateChannelCanvasTool } from '@/tools/slack/create_channel_canvas' import { slackCreateConversationTool } from '@/tools/slack/create_conversation' +import { slackDeleteCanvasTool } from '@/tools/slack/delete_canvas' import { slackDeleteMessageTool } from '@/tools/slack/delete_message' import { slackDownloadTool } from '@/tools/slack/download' import { slackEditCanvasTool } from '@/tools/slack/edit_canvas' import { slackEphemeralMessageTool } from '@/tools/slack/ephemeral_message' +import { slackGetCanvasTool } from '@/tools/slack/get_canvas' import { slackGetChannelInfoTool } from '@/tools/slack/get_channel_info' import { slackGetMessageTool } from '@/tools/slack/get_message' import { slackGetThreadTool } from '@/tools/slack/get_thread' import { slackGetUserTool } from '@/tools/slack/get_user' import { slackGetUserPresenceTool } from '@/tools/slack/get_user_presence' import { slackInviteToConversationTool } from '@/tools/slack/invite_to_conversation' +import { slackListCanvasesTool } from '@/tools/slack/list_canvases' import { slackListChannelsTool } from '@/tools/slack/list_channels' import { slackListMembersTool } from '@/tools/slack/list_members' import { slackListUsersTool } from '@/tools/slack/list_users' +import { slackLookupCanvasSectionsTool } from '@/tools/slack/lookup_canvas_sections' import { slackMessageTool } from '@/tools/slack/message' import { slackMessageReaderTool } from '@/tools/slack/message_reader' import { slackOpenViewTool } from '@/tools/slack/open_view' @@ -29,6 +33,10 @@ export { slackCanvasTool, slackCreateConversationTool, slackCreateChannelCanvasTool, + slackGetCanvasTool, + slackListCanvasesTool, + slackLookupCanvasSectionsTool, + slackDeleteCanvasTool, slackMessageReaderTool, slackDownloadTool, slackEditCanvasTool, @@ -51,3 +59,5 @@ export { slackGetThreadTool, slackInviteToConversationTool, } + +export * from './types' diff --git a/apps/sim/tools/slack/list_canvases.ts b/apps/sim/tools/slack/list_canvases.ts new file mode 100644 index 00000000000..c473835e795 --- /dev/null +++ b/apps/sim/tools/slack/list_canvases.ts @@ -0,0 +1,142 @@ +import type { SlackListCanvasesParams, SlackListCanvasesResponse } from '@/tools/slack/types' +import { CANVAS_FILE_OUTPUT_PROPERTIES, CANVAS_PAGING_OUTPUT_PROPERTIES } from '@/tools/slack/types' +import { mapCanvasFile } from '@/tools/slack/utils' +import type { ToolConfig } from '@/tools/types' + +export const slackListCanvasesTool: ToolConfig = + { + id: 'slack_list_canvases', + name: 'Slack List Canvases', + description: 'List Slack canvases available to the authenticated user or bot', + version: '1.0.0', + + oauth: { + required: true, + provider: 'slack', + }, + + params: { + authMethod: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Authentication method: oauth or bot_token', + }, + botToken: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Bot token for Custom Bot', + }, + accessToken: { + type: 'string', + required: false, + visibility: 'hidden', + description: 'OAuth access token or bot token for Slack API', + }, + channel: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter canvases appearing in a specific channel ID', + }, + count: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Number of canvases to return per page', + }, + page: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Page number to return', + }, + user: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter canvases created by a single user ID', + }, + tsFrom: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter canvases created after this Unix timestamp', + }, + tsTo: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter canvases created before this Unix timestamp', + }, + teamId: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Encoded team ID, required when using an org-level token', + }, + }, + + request: { + url: (params: SlackListCanvasesParams) => { + const url = new URL('https://slack.com/api/files.list') + url.searchParams.append('types', 'canvas') + + if (params.channel) url.searchParams.append('channel', params.channel.trim()) + if (params.count) url.searchParams.append('count', String(params.count)) + if (params.page) url.searchParams.append('page', String(params.page)) + if (params.user) url.searchParams.append('user', params.user.trim()) + if (params.tsFrom) url.searchParams.append('ts_from', params.tsFrom.trim()) + if (params.tsTo) url.searchParams.append('ts_to', params.tsTo.trim()) + if (params.teamId) url.searchParams.append('team_id', params.teamId.trim()) + + return url.toString() + }, + method: 'GET', + headers: (params: SlackListCanvasesParams) => ({ + 'Content-Type': 'application/json', + Authorization: `Bearer ${params.accessToken || params.botToken}`, + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!data.ok) { + if (data.error === 'unknown_type') { + throw new Error('Slack did not recognize the canvas file type filter.') + } + throw new Error(data.error || 'Failed to list canvases from Slack') + } + + return { + success: true, + output: { + canvases: (data.files ?? []).map(mapCanvasFile), + paging: { + count: data.paging?.count ?? 0, + total: data.paging?.total ?? 0, + page: data.paging?.page ?? 0, + pages: data.paging?.pages ?? 0, + }, + }, + } + }, + + outputs: { + canvases: { + type: 'array', + description: 'Canvas file objects returned by Slack', + items: { + type: 'object', + properties: CANVAS_FILE_OUTPUT_PROPERTIES, + }, + }, + paging: { + type: 'object', + description: 'Pagination information from Slack', + properties: CANVAS_PAGING_OUTPUT_PROPERTIES, + }, + }, + } diff --git a/apps/sim/tools/slack/lookup_canvas_sections.ts b/apps/sim/tools/slack/lookup_canvas_sections.ts new file mode 100644 index 00000000000..5f48c8a52a0 --- /dev/null +++ b/apps/sim/tools/slack/lookup_canvas_sections.ts @@ -0,0 +1,114 @@ +import type { + SlackLookupCanvasSectionsParams, + SlackLookupCanvasSectionsResponse, +} from '@/tools/slack/types' +import { CANVAS_SECTION_OUTPUT_PROPERTIES } from '@/tools/slack/types' +import type { ToolConfig } from '@/tools/types' + +const parseCriteria = (criteria: SlackLookupCanvasSectionsParams['criteria']) => { + if (typeof criteria !== 'string') { + return criteria + } + + try { + return JSON.parse(criteria) + } catch { + throw new Error('Canvas section criteria must be a valid JSON object') + } +} + +export const slackLookupCanvasSectionsTool: ToolConfig< + SlackLookupCanvasSectionsParams, + SlackLookupCanvasSectionsResponse +> = { + id: 'slack_lookup_canvas_sections', + name: 'Slack Lookup Canvas Sections', + description: 'Find Slack canvas section IDs matching criteria for later edits', + version: '1.0.0', + + oauth: { + required: true, + provider: 'slack', + }, + + params: { + authMethod: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Authentication method: oauth or bot_token', + }, + botToken: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Bot token for Custom Bot', + }, + accessToken: { + type: 'string', + required: false, + visibility: 'hidden', + description: 'OAuth access token or bot token for Slack API', + }, + canvasId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Canvas ID to search (e.g., F1234ABCD)', + }, + criteria: { + type: 'json', + required: true, + visibility: 'user-or-llm', + description: + 'Section lookup criteria, such as {"section_types":["h1"],"contains_text":"Roadmap"}', + }, + }, + + request: { + url: 'https://slack.com/api/canvases.sections.lookup', + method: 'POST', + headers: (params: SlackLookupCanvasSectionsParams) => ({ + 'Content-Type': 'application/json', + Authorization: `Bearer ${params.accessToken || params.botToken}`, + }), + body: (params: SlackLookupCanvasSectionsParams) => ({ + canvas_id: params.canvasId.trim(), + criteria: parseCriteria(params.criteria), + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!data.ok) { + if (data.error === 'canvas_not_found') { + throw new Error('Canvas not found or not visible to the authenticated Slack user or bot.') + } + if (data.error === 'missing_scope') { + throw new Error( + 'Missing required permissions. Please reconnect your Slack account with the canvases:read scope.' + ) + } + throw new Error(data.error || 'Failed to look up canvas sections') + } + + return { + success: true, + output: { + sections: data.sections ?? [], + }, + } + }, + + outputs: { + sections: { + type: 'array', + description: 'Canvas sections matching the lookup criteria', + items: { + type: 'object', + properties: CANVAS_SECTION_OUTPUT_PROPERTIES, + }, + }, + }, +} diff --git a/apps/sim/tools/slack/types.ts b/apps/sim/tools/slack/types.ts index 0a5e4a70c55..8e193363fb0 100644 --- a/apps/sim/tools/slack/types.ts +++ b/apps/sim/tools/slack/types.ts @@ -488,6 +488,91 @@ export const CANVAS_OUTPUT_PROPERTIES = { title: { type: 'string', description: 'Canvas title' }, } as const satisfies Record +/** + * Canvas file object output properties. + * Based on Slack file objects returned by files.info and files.list for canvases. + */ +export const CANVAS_FILE_OUTPUT_PROPERTIES = { + id: { type: 'string', description: 'Unique canvas file identifier' }, + created: { type: 'number', description: 'Unix timestamp when the canvas was created' }, + timestamp: { type: 'number', description: 'Unix timestamp associated with the canvas' }, + name: { type: 'string', description: 'Canvas file name', optional: true }, + title: { type: 'string', description: 'Canvas title', optional: true }, + mimetype: { type: 'string', description: 'MIME type of the canvas file', optional: true }, + filetype: { type: 'string', description: 'Slack file type for the canvas', optional: true }, + pretty_type: { type: 'string', description: 'Human-readable file type', optional: true }, + user: { type: 'string', description: 'User ID of the canvas creator', optional: true }, + editable: { type: 'boolean', description: 'Whether the canvas file is editable', optional: true }, + size: { type: 'number', description: 'Canvas file size in bytes', optional: true }, + mode: { type: 'string', description: 'File mode', optional: true }, + is_external: { + type: 'boolean', + description: 'Whether the canvas is externally hosted', + optional: true, + }, + is_public: { type: 'boolean', description: 'Whether the canvas is public', optional: true }, + url_private: { + type: 'string', + description: 'Private URL for the canvas file', + optional: true, + }, + url_private_download: { + type: 'string', + description: 'Private download URL for the canvas file', + optional: true, + }, + permalink: { type: 'string', description: 'Permanent URL for the canvas', optional: true }, + channels: { + type: 'array', + description: 'Public channel IDs where the canvas appears', + items: { type: 'string', description: 'Channel ID' }, + optional: true, + }, + groups: { + type: 'array', + description: 'Private channel IDs where the canvas appears', + items: { type: 'string', description: 'Channel ID' }, + optional: true, + }, + ims: { + type: 'array', + description: 'Direct message IDs where the canvas appears', + items: { type: 'string', description: 'Conversation ID' }, + optional: true, + }, + canvas_readtime: { + type: 'number', + description: 'Approximate read time for canvas content', + optional: true, + }, + is_channel_space: { + type: 'boolean', + description: 'Whether this canvas is linked to a channel', + optional: true, + }, + linked_channel_id: { + type: 'string', + description: 'Channel ID linked to this canvas', + optional: true, + }, + canvas_creator_id: { + type: 'string', + description: 'User ID of the canvas creator', + optional: true, + }, +} as const satisfies Record + +export const CANVAS_PAGING_OUTPUT_PROPERTIES = { + count: { type: 'number', description: 'Number of items requested per page' }, + total: { type: 'number', description: 'Total number of matching files' }, + page: { type: 'number', description: 'Current page number' }, + pages: { type: 'number', description: 'Total number of pages' }, +} as const satisfies Record + +export const CANVAS_SECTION_OUTPUT_PROPERTIES = { + id: { type: 'string', description: 'Canvas section identifier' }, +} as const satisfies Record + /** * Output definition for modal view objects * Based on Slack views.open response structure @@ -735,6 +820,29 @@ export interface SlackCreateChannelCanvasParams extends SlackBaseParams { content?: string } +export interface SlackGetCanvasParams extends SlackBaseParams { + canvasId: string +} + +export interface SlackListCanvasesParams extends SlackBaseParams { + channel?: string + count?: number + page?: number + user?: string + tsFrom?: string + tsTo?: string + teamId?: string +} + +export interface SlackLookupCanvasSectionsParams extends SlackBaseParams { + canvasId: string + criteria: Record | string +} + +export interface SlackDeleteCanvasParams extends SlackBaseParams { + canvasId: string +} + export interface SlackOpenViewParams extends SlackBaseParams { triggerId: string interactivityPointer?: string @@ -1078,6 +1186,69 @@ export interface SlackCreateChannelCanvasResponse extends ToolResponse { } } +export interface SlackCanvasFile { + id: string + created: number | null + timestamp: number | null + name?: string | null + title?: string | null + mimetype?: string | null + filetype?: string | null + pretty_type?: string | null + user?: string | null + editable?: boolean | null + size?: number | null + mode?: string | null + is_external?: boolean | null + is_public?: boolean | null + url_private?: string | null + url_private_download?: string | null + permalink?: string | null + channels?: string[] + groups?: string[] + ims?: string[] + canvas_readtime?: number | null + is_channel_space?: boolean | null + linked_channel_id?: string | null + canvas_creator_id?: string | null +} + +export interface SlackCanvasPaging { + count: number + total: number + page: number + pages: number +} + +export interface SlackCanvasSection { + id: string +} + +export interface SlackGetCanvasResponse extends ToolResponse { + output: { + canvas: SlackCanvasFile + } +} + +export interface SlackListCanvasesResponse extends ToolResponse { + output: { + canvases: SlackCanvasFile[] + paging: SlackCanvasPaging + } +} + +export interface SlackLookupCanvasSectionsResponse extends ToolResponse { + output: { + sections: SlackCanvasSection[] + } +} + +export interface SlackDeleteCanvasResponse extends ToolResponse { + output: { + ok: boolean + } +} + export interface SlackView { id: string team_id?: string | null @@ -1143,6 +1314,10 @@ export type SlackResponse = | SlackGetUserPresenceResponse | SlackEditCanvasResponse | SlackCreateChannelCanvasResponse + | SlackGetCanvasResponse + | SlackListCanvasesResponse + | SlackLookupCanvasSectionsResponse + | SlackDeleteCanvasResponse | SlackCreateConversationResponse | SlackInviteToConversationResponse | SlackOpenViewResponse diff --git a/apps/sim/tools/slack/utils.ts b/apps/sim/tools/slack/utils.ts new file mode 100644 index 00000000000..a6e2849dd15 --- /dev/null +++ b/apps/sim/tools/slack/utils.ts @@ -0,0 +1,28 @@ +import type { SlackCanvasFile } from '@/tools/slack/types' + +export const mapCanvasFile = (file: SlackCanvasFile): SlackCanvasFile => ({ + id: file.id, + created: file.created ?? null, + timestamp: file.timestamp ?? null, + name: file.name ?? null, + title: file.title ?? null, + mimetype: file.mimetype ?? null, + filetype: file.filetype ?? null, + pretty_type: file.pretty_type ?? null, + user: file.user ?? null, + editable: file.editable ?? null, + size: file.size ?? null, + mode: file.mode ?? null, + is_external: file.is_external ?? null, + is_public: file.is_public ?? null, + url_private: file.url_private ?? null, + url_private_download: file.url_private_download ?? null, + permalink: file.permalink ?? null, + channels: file.channels ?? [], + groups: file.groups ?? [], + ims: file.ims ?? [], + canvas_readtime: file.canvas_readtime ?? null, + is_channel_space: file.is_channel_space ?? null, + linked_channel_id: file.linked_channel_id ?? null, + canvas_creator_id: file.canvas_creator_id ?? null, +})