diff --git a/apps/sim/app/api/table/[tableId]/import/route.test.ts b/apps/sim/app/api/table/[tableId]/import/route.test.ts index a77da89c52e..438f74e035e 100644 --- a/apps/sim/app/api/table/[tableId]/import/route.test.ts +++ b/apps/sim/app/api/table/[tableId]/import/route.test.ts @@ -8,17 +8,15 @@ import type { TableDefinition } from '@/lib/table' const { mockCheckAccess, - mockBatchInsertRowsWithTx, - mockReplaceTableRowsWithTx, - mockAddTableColumnsWithTx, + mockImportAppendRows, + mockImportReplaceRows, mockDispatchAfterBatchInsert, mockMarkTableImporting, mockReleaseImportClaim, } = vi.hoisted(() => ({ mockCheckAccess: vi.fn(), - mockBatchInsertRowsWithTx: vi.fn(), - mockReplaceTableRowsWithTx: vi.fn(), - mockAddTableColumnsWithTx: vi.fn(), + mockImportAppendRows: vi.fn(), + mockImportReplaceRows: vi.fn(), mockDispatchAfterBatchInsert: vi.fn(), mockMarkTableImporting: vi.fn(), mockReleaseImportClaim: vi.fn(), @@ -47,15 +45,15 @@ vi.mock('@/app/api/table/utils', async () => { }) /** - * The route imports `batchInsertRows` and `replaceTableRows` from the barrel, - * which forwards them from `./service`. Mocking the service module replaces - * both without having to touch the other real helpers (`parseCsvBuffer`, - * `coerceRowsForTable`, etc.) exported through the barrel. + * The route imports `importAppendRows` / `importReplaceRows` from the barrel, + * which forwards them from `./service`. These functions own the import + * transaction (column adds + row writes); mocking the service module replaces + * them without touching the other real helpers (`coerceRowsForTable`, + * `createCsvParser`, etc.) exported through the barrel. */ vi.mock('@/lib/table/service', () => ({ - batchInsertRowsWithTx: mockBatchInsertRowsWithTx, - replaceTableRowsWithTx: mockReplaceTableRowsWithTx, - addTableColumnsWithTx: mockAddTableColumnsWithTx, + importAppendRows: mockImportAppendRows, + importReplaceRows: mockImportReplaceRows, dispatchAfterBatchInsert: mockDispatchAfterBatchInsert, markTableImporting: mockMarkTableImporting, releaseImportClaim: mockReleaseImportClaim, @@ -125,6 +123,16 @@ function buildTable(overrides: Partial = {}): TableDefinition { } } +/** Additions array the route passed to importAppendRows (2nd positional arg). */ +function appendAdditions(): { name: string; type: string }[] { + return mockImportAppendRows.mock.calls[0][1] as { name: string; type: string }[] +} + +/** Rows array the route passed to importAppendRows (3rd positional arg). */ +function appendRows(): unknown[] { + return mockImportAppendRows.mock.calls[0][2] as unknown[] +} + async function callPost(form: FormData, { tableId }: { tableId: string } = { tableId: 'tbl_1' }) { // Building the request from a FormData body gives a real multipart stream and // boundary, exercising the streaming `readMultipart` parser end-to-end. @@ -144,27 +152,15 @@ describe('POST /api/table/[tableId]/import', () => { authType: 'session', }) mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() }) - mockBatchInsertRowsWithTx.mockImplementation(async (_trx, data: { rows: unknown[] }) => - data.rows.map((_, i) => ({ id: `row_${i}` })) + mockImportAppendRows.mockImplementation( + async (table: TableDefinition, _additions: unknown, rows: unknown[]) => ({ + inserted: rows.map((_, i) => ({ id: `row_${i}` })), + table, + }) ) - mockReplaceTableRowsWithTx.mockResolvedValue({ deletedCount: 0, insertedCount: 0 }) + mockImportReplaceRows.mockResolvedValue({ deletedCount: 0, insertedCount: 0 }) mockMarkTableImporting.mockResolvedValue(true) mockReleaseImportClaim.mockResolvedValue(undefined) - mockAddTableColumnsWithTx.mockImplementation( - async ( - _trx, - table: { schema: { columns: { name: string; type: string }[] } }, - columns: { name: string; type: string }[] - ) => ({ - ...table, - schema: { - columns: [ - ...table.schema.columns, - ...columns.map((c) => ({ name: c.name, type: c.type as 'string' })), - ], - }, - }) - ) }) it('returns 401 when the user is not authenticated', async () => { @@ -180,8 +176,8 @@ describe('POST /api/table/[tableId]/import', () => { mockMarkTableImporting.mockResolvedValueOnce(false) const response = await callPost(createFormData(createCsvFile('name,age\nAlice,30'))) expect(response.status).toBe(409) - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() - expect(mockReplaceTableRowsWithTx).not.toHaveBeenCalled() + expect(mockImportAppendRows).not.toHaveBeenCalled() + expect(mockImportReplaceRows).not.toHaveBeenCalled() expect(mockReleaseImportClaim).not.toHaveBeenCalled() }) @@ -242,8 +238,8 @@ describe('POST /api/table/[tableId]/import', () => { const response = await POST(req, { params: Promise.resolve({ tableId: 'tbl_1' }) }) expect(response.status).toBe(400) - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() - expect(mockReplaceTableRowsWithTx).not.toHaveBeenCalled() + expect(mockImportAppendRows).not.toHaveBeenCalled() + expect(mockImportReplaceRows).not.toHaveBeenCalled() }) it('returns 400 when the CSV is missing a required column', async () => { @@ -252,10 +248,10 @@ describe('POST /api/table/[tableId]/import', () => { const data = await response.json() expect(data.error).toMatch(/missing required columns/i) expect(data.details?.missingRequired).toEqual(['name']) - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() + expect(mockImportAppendRows).not.toHaveBeenCalled() }) - it('appends rows via batchInsertRows', async () => { + it('appends rows via importAppendRows', async () => { const response = await callPost( createFormData(createCsvFile('name,age\nAlice,30\nBob,40'), { mode: 'append' }) ) @@ -263,13 +259,12 @@ describe('POST /api/table/[tableId]/import', () => { const data = await response.json() expect(data.data.mode).toBe('append') expect(data.data.insertedCount).toBe(2) - expect(mockBatchInsertRowsWithTx).toHaveBeenCalledTimes(1) - const callArgs = mockBatchInsertRowsWithTx.mock.calls[0][1] as { rows: unknown[] } - expect(callArgs.rows).toEqual([ + expect(mockImportAppendRows).toHaveBeenCalledTimes(1) + expect(appendRows()).toEqual([ { name: 'Alice', age: 30 }, { name: 'Bob', age: 40 }, ]) - expect(mockReplaceTableRowsWithTx).not.toHaveBeenCalled() + expect(mockImportReplaceRows).not.toHaveBeenCalled() }) it('accepts chunked multipart imports without a content-length header', async () => { @@ -284,7 +279,7 @@ describe('POST /api/table/[tableId]/import', () => { const response = await POST(req, { params: Promise.resolve({ tableId: 'tbl_1' }) }) expect(response.status).toBe(200) - expect(mockBatchInsertRowsWithTx).toHaveBeenCalledTimes(1) + expect(mockImportAppendRows).toHaveBeenCalledTimes(1) }) it('rejects append when it would exceed maxRows', async () => { @@ -298,11 +293,11 @@ describe('POST /api/table/[tableId]/import', () => { expect(response.status).toBe(400) const data = await response.json() expect(data.error).toMatch(/exceed table row limit/) - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() + expect(mockImportAppendRows).not.toHaveBeenCalled() }) - it('replaces rows via replaceTableRows', async () => { - mockReplaceTableRowsWithTx.mockResolvedValueOnce({ deletedCount: 5, insertedCount: 2 }) + it('replaces rows via importReplaceRows', async () => { + mockImportReplaceRows.mockResolvedValueOnce({ deletedCount: 5, insertedCount: 2 }) const response = await callPost( createFormData(createCsvFile('name,age\nAlice,30\nBob,40'), { mode: 'replace' }) ) @@ -311,8 +306,8 @@ describe('POST /api/table/[tableId]/import', () => { expect(data.data.mode).toBe('replace') expect(data.data.deletedCount).toBe(5) expect(data.data.insertedCount).toBe(2) - expect(mockReplaceTableRowsWithTx).toHaveBeenCalledTimes(1) - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() + expect(mockImportReplaceRows).toHaveBeenCalledTimes(1) + expect(mockImportAppendRows).not.toHaveBeenCalled() }) it('uses an explicit mapping when provided', async () => { @@ -325,8 +320,7 @@ describe('POST /api/table/[tableId]/import', () => { expect(response.status).toBe(200) const data = await response.json() expect(data.data.mappedColumns).toEqual(['First Name', 'Years']) - const callArgs = mockBatchInsertRowsWithTx.mock.calls[0][1] as { rows: unknown[] } - expect(callArgs.rows).toEqual([ + expect(appendRows()).toEqual([ { name: 'Alice', age: 30 }, { name: 'Bob', age: 40 }, ]) @@ -356,8 +350,8 @@ describe('POST /api/table/[tableId]/import', () => { expect(data.error).toMatch(/Mapping values must be/) }) - it('surfaces unique violations from batchInsertRows as 400', async () => { - mockBatchInsertRowsWithTx.mockRejectedValueOnce( + it('surfaces unique violations from importAppendRows as 400', async () => { + mockImportAppendRows.mockRejectedValueOnce( new Error('Row 1: Column "name" must be unique. Value "Alice" already exists in row row_xxx') ) const response = await callPost( @@ -377,7 +371,7 @@ describe('POST /api/table/[tableId]/import', () => { ) ) expect(response.status).toBe(200) - expect(mockBatchInsertRowsWithTx).toHaveBeenCalledTimes(1) + expect(mockImportAppendRows).toHaveBeenCalledTimes(1) }) it('returns 400 for unsupported file extensions', async () => { @@ -398,12 +392,9 @@ describe('POST /api/table/[tableId]/import', () => { }) ) expect(response.status).toBe(200) - expect(mockAddTableColumnsWithTx).toHaveBeenCalledTimes(1) - const [, , columns] = mockAddTableColumnsWithTx.mock.calls[0] - expect(columns).toEqual([{ name: 'email', type: 'string' }]) - - const callArgs = mockBatchInsertRowsWithTx.mock.calls[0][1] as { rows: unknown[] } - expect(callArgs.rows).toEqual([ + expect(mockImportAppendRows).toHaveBeenCalledTimes(1) + expect(appendAdditions()).toEqual([{ name: 'email', type: 'string' }]) + expect(appendRows()).toEqual([ { name: 'Alice', age: 30, email: 'a@x.io' }, { name: 'Bob', age: 40, email: 'b@x.io' }, ]) @@ -417,8 +408,7 @@ describe('POST /api/table/[tableId]/import', () => { }) ) expect(response.status).toBe(200) - const [, , columns] = mockAddTableColumnsWithTx.mock.calls[0] - expect(columns).toEqual([{ name: 'score', type: 'number' }]) + expect(appendAdditions()).toEqual([{ name: 'score', type: 'number' }]) }) it('dedupes when sanitized name collides with an existing column', async () => { @@ -441,8 +431,7 @@ describe('POST /api/table/[tableId]/import', () => { }) ) expect(response.status).toBe(200) - const [, , columns] = mockAddTableColumnsWithTx.mock.calls[0] - expect(columns).toEqual([{ name: 'Email_2', type: 'string' }]) + expect(appendAdditions()).toEqual([{ name: 'Email_2', type: 'string' }]) }) it('returns 400 when createColumns references a header not in the CSV', async () => { @@ -455,8 +444,7 @@ describe('POST /api/table/[tableId]/import', () => { expect(response.status).toBe(400) const data = await response.json() expect(data.error).toMatch(/unknown CSV headers/) - expect(mockAddTableColumnsWithTx).not.toHaveBeenCalled() - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() + expect(mockImportAppendRows).not.toHaveBeenCalled() }) it('returns 400 when createColumns is not an array of strings', async () => { @@ -469,7 +457,7 @@ describe('POST /api/table/[tableId]/import', () => { expect(response.status).toBe(400) const data = await response.json() expect(data.error).toMatch(/createColumns must be a JSON array/) - expect(mockAddTableColumnsWithTx).not.toHaveBeenCalled() + expect(mockImportAppendRows).not.toHaveBeenCalled() }) it('returns 400 when createColumns is invalid JSON', async () => { @@ -484,8 +472,8 @@ describe('POST /api/table/[tableId]/import', () => { expect(data.error).toMatch(/createColumns must be valid JSON/) }) - it('surfaces addTableColumns failures as 400', async () => { - mockAddTableColumnsWithTx.mockRejectedValueOnce(new Error('Column "email" already exists')) + it('surfaces column-creation failures from importAppendRows as 400', async () => { + mockImportAppendRows.mockRejectedValueOnce(new Error('Column "email" already exists')) const response = await callPost( createFormData(createCsvFile('name,age,email\nAlice,30,a@x.io'), { mode: 'append', @@ -495,30 +483,30 @@ describe('POST /api/table/[tableId]/import', () => { expect(response.status).toBe(400) const data = await response.json() expect(data.error).toMatch(/already exists/) - expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled() }) it('surfaces row insert failures without success when schema was mutated', async () => { - mockBatchInsertRowsWithTx.mockRejectedValueOnce(new Error('must be unique')) + mockImportAppendRows.mockRejectedValueOnce(new Error('must be unique')) const response = await callPost( createFormData(createCsvFile('name,age,email\nAlice,30,a@x.io'), { mode: 'append', createColumns: ['email'], }) ) - expect(mockAddTableColumnsWithTx).toHaveBeenCalled() + // Route forwarded the column addition into the (now atomic) import op. + expect(appendAdditions()).toEqual([{ name: 'email', type: 'string' }]) expect(response.status).toBe(400) const data = await response.json() expect(data.success).toBeUndefined() expect(data.error).toMatch(/must be unique/) }) - it('does not call addTableColumns when createColumns is omitted', async () => { + it('passes no additions when createColumns is omitted', async () => { const response = await callPost( createFormData(createCsvFile('name,age\nAlice,30'), { mode: 'append' }) ) expect(response.status).toBe(200) - expect(mockAddTableColumnsWithTx).not.toHaveBeenCalled() + expect(appendAdditions()).toEqual([]) }) }) }) diff --git a/apps/sim/app/api/table/[tableId]/import/route.ts b/apps/sim/app/api/table/[tableId]/import/route.ts index 3d16cd636e8..5fd11d20426 100644 --- a/apps/sim/app/api/table/[tableId]/import/route.ts +++ b/apps/sim/app/api/table/[tableId]/import/route.ts @@ -1,5 +1,4 @@ import type { Readable } from 'node:stream' -import { db } from '@sim/db' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' @@ -18,23 +17,20 @@ import { isMultipartError, readMultipart } from '@/lib/core/utils/multipart' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { - addTableColumnsWithTx, - batchInsertRowsWithTx, buildAutoMapping, - CSV_MAX_BATCH_SIZE, CSV_MAX_FILE_SIZE_BYTES, type CsvHeaderMapping, CsvImportValidationError, coerceRowsForTable, createCsvParser, dispatchAfterBatchInsert, + importAppendRows, + importReplaceRows, inferColumnType, markTableImporting, releaseImportClaim, - replaceTableRowsWithTx, sanitizeName, type TableDefinition, - type TableRow, type TableSchema, validateMapping, } from '@/lib/table' @@ -275,32 +271,12 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro } try { - const txResult = await db.transaction(async (trx) => { - let working = table - if (additions.length > 0) { - working = await addTableColumnsWithTx(trx, table, additions, requestId) - } - - const allInserted: TableRow[] = [] - for (let i = 0; i < coerced.length; i += CSV_MAX_BATCH_SIZE) { - const batch = coerced.slice(i, i + CSV_MAX_BATCH_SIZE) - const batchRequestId = generateId().slice(0, 8) - const result = await batchInsertRowsWithTx( - trx, - { - tableId: working.id, - rows: batch, - workspaceId, - userId: authResult.userId, - }, - working, - batchRequestId - ) - allInserted.push(...result) - } - return { inserted: allInserted, working } - }) - const { inserted: insertedRows, working: finalTable } = txResult + const { inserted: insertedRows, table: finalTable } = await importAppendRows( + table, + additions, + coerced, + { workspaceId, userId: authResult.userId, requestId } + ) const inserted = insertedRows.length // Fire trigger + scheduler AFTER the tx commits — both read through the // global db connection and would otherwise see no rows. @@ -355,18 +331,12 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro } try { - const result = await db.transaction(async (trx) => { - let working = table - if (additions.length > 0) { - working = await addTableColumnsWithTx(trx, table, additions, requestId) - } - return replaceTableRowsWithTx( - trx, - { tableId: working.id, rows: coerced, workspaceId, userId: authResult.userId }, - working, - requestId - ) - }) + const result = await importReplaceRows( + table, + additions, + { rows: coerced, workspaceId, userId: authResult.userId }, + requestId + ) logger.info(`[${requestId}] Replace CSV imported`, { tableId: table.id, diff --git a/apps/sim/app/api/table/[tableId]/rows/route.ts b/apps/sim/app/api/table/[tableId]/rows/route.ts index 8e29e12005c..9b5ad9492db 100644 --- a/apps/sim/app/api/table/[tableId]/rows/route.ts +++ b/apps/sim/app/api/table/[tableId]/rows/route.ts @@ -1,8 +1,5 @@ -import { db } from '@sim/db' -import { tableRowExecutions, userTableRows } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' -import { and, eq, inArray, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { type BatchInsertTableRowsBodyInput, @@ -17,27 +14,20 @@ import { isZodError, validationErrorResponse } from '@/lib/api/server/validation import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' -import type { - Filter, - RowData, - RowExecutionMetadata, - RowExecutions, - Sort, - TableSchema, -} from '@/lib/table' +import type { Filter, RowData, Sort, TableSchema } from '@/lib/table' import { batchInsertRows, batchUpdateRows, deleteRowsByFilter, deleteRowsByIds, insertRow, - USER_TABLE_ROWS_SQL_NAME, updateRowsByFilter, validateBatchRows, validateRowData, validateRowSize, } from '@/lib/table' -import { buildFilterClause, buildSortClause, TableQueryValidationError } from '@/lib/table/sql' +import { queryRows } from '@/lib/table/service' +import { TableQueryValidationError } from '@/lib/table/sql' import { accessError, checkAccess } from '@/app/api/table/utils' const logger = createLogger('TableRowsAPI') @@ -268,113 +258,35 @@ export const GET = withRouteHandler( return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) } - const baseConditions = [ - eq(userTableRows.tableId, tableId), - eq(userTableRows.workspaceId, validated.workspaceId), - ] - - const schema = table.schema as TableSchema - - if (validated.filter) { - const filterClause = buildFilterClause( - validated.filter as Filter, - USER_TABLE_ROWS_SQL_NAME, - schema.columns - ) - if (filterClause) { - baseConditions.push(filterClause) - } - } - - let query = db - .select({ - id: userTableRows.id, - data: userTableRows.data, - position: userTableRows.position, - createdAt: userTableRows.createdAt, - updatedAt: userTableRows.updatedAt, - }) - .from(userTableRows) - .where(and(...baseConditions)) - - if (validated.sort) { - const sortClause = buildSortClause(validated.sort, USER_TABLE_ROWS_SQL_NAME, schema.columns) - if (sortClause) { - query = query.orderBy(sortClause) as typeof query - } else { - query = query.orderBy(userTableRows.position) as typeof query - } - } else { - query = query.orderBy(userTableRows.position) as typeof query - } - - let totalCount: number | null = null - if (validated.includeTotal) { - const [{ count }] = await db - .select({ count: sql`count(*)` }) - .from(userTableRows) - .where(and(...baseConditions)) - totalCount = Number(count) - } - - const rows = await query.limit(validated.limit).offset(validated.offset) - - // Sidecar: fetch per-(row, group) execution state and group into a map - // so the response preserves the legacy `row.executions[groupId]` wire - // shape. One indexed-IN scan against table_row_executions. - const executionsByRow = new Map() - if (rows.length > 0) { - const execRows = await db - .select() - .from(tableRowExecutions) - .where( - inArray( - tableRowExecutions.rowId, - rows.map((r) => r.id) - ) - ) - for (const e of execRows) { - const existing = executionsByRow.get(e.rowId) ?? {} - const meta: RowExecutionMetadata = { - status: e.status as RowExecutionMetadata['status'], - executionId: e.executionId ?? null, - jobId: e.jobId ?? null, - workflowId: e.workflowId, - error: e.error ?? null, - ...(e.runningBlockIds && e.runningBlockIds.length > 0 - ? { runningBlockIds: e.runningBlockIds } - : {}), - ...(e.blockErrors && Object.keys(e.blockErrors as Record).length > 0 - ? { blockErrors: e.blockErrors as Record } - : {}), - ...(e.cancelledAt ? { cancelledAt: e.cancelledAt.toISOString() } : {}), - } - existing[e.groupId] = meta - executionsByRow.set(e.rowId, existing) - } - } - - logger.info( - `[${requestId}] Queried ${rows.length} rows from table ${tableId} (total: ${totalCount ?? 'n/a'})` + const result = await queryRows( + table, + { + filter: validated.filter as Filter | undefined, + sort: validated.sort, + limit: validated.limit, + offset: validated.offset, + includeTotal: validated.includeTotal, + }, + requestId ) return NextResponse.json({ success: true, data: { - rows: rows.map((r) => ({ + rows: result.rows.map((r) => ({ id: r.id, data: r.data, - executions: executionsByRow.get(r.id) ?? {}, + executions: r.executions, position: r.position, createdAt: r.createdAt instanceof Date ? r.createdAt.toISOString() : String(r.createdAt), updatedAt: r.updatedAt instanceof Date ? r.updatedAt.toISOString() : String(r.updatedAt), })), - rowCount: rows.length, - totalCount, - limit: validated.limit, - offset: validated.offset, + rowCount: result.rowCount, + totalCount: result.totalCount, + limit: result.limit, + offset: result.offset, }, }) } catch (error) { diff --git a/apps/sim/app/api/v1/tables/[tableId]/rows/route.ts b/apps/sim/app/api/v1/tables/[tableId]/rows/route.ts index e736a859eaa..55cf776dc7b 100644 --- a/apps/sim/app/api/v1/tables/[tableId]/rows/route.ts +++ b/apps/sim/app/api/v1/tables/[tableId]/rows/route.ts @@ -1,8 +1,5 @@ -import { db } from '@sim/db' -import { userTableRows } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' -import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { type V1BatchInsertTableRowsBody, @@ -24,13 +21,13 @@ import { deleteRowsByFilter, deleteRowsByIds, insertRow, - USER_TABLE_ROWS_SQL_NAME, updateRowsByFilter, validateBatchRows, validateRowData, validateRowSize, } from '@/lib/table' -import { buildFilterClause, buildSortClause, TableQueryValidationError } from '@/lib/table/sql' +import { queryRows } from '@/lib/table/service' +import { TableQueryValidationError } from '@/lib/table/sql' import { accessError, checkAccess } from '@/app/api/table/utils' import { checkRateLimit, @@ -153,92 +150,33 @@ export const GET = withRouteHandler(async (request: NextRequest, context: TableR return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) } - const baseConditions = [ - eq(userTableRows.tableId, tableId), - eq(userTableRows.workspaceId, validated.workspaceId), - ] - - const schema = table.schema as TableSchema - - if (validated.filter) { - const filterClause = buildFilterClause( - validated.filter as Filter, - USER_TABLE_ROWS_SQL_NAME, - schema.columns - ) - if (filterClause) { - baseConditions.push(filterClause) - } - } - - let query = db - .select({ - id: userTableRows.id, - data: userTableRows.data, - position: userTableRows.position, - createdAt: userTableRows.createdAt, - updatedAt: userTableRows.updatedAt, - }) - .from(userTableRows) - .where(and(...baseConditions)) - - if (validated.sort) { - const sortClause = buildSortClause(validated.sort, USER_TABLE_ROWS_SQL_NAME, schema.columns) - if (sortClause) { - query = query.orderBy(sortClause) as typeof query - } else { - query = query.orderBy(userTableRows.position) as typeof query - } - } else { - query = query.orderBy(userTableRows.position) as typeof query - } - - const rowsPromise = query.limit(validated.limit).offset(validated.offset) - - let totalCount: number | null = null - if (validated.includeTotal) { - const countQuery = db - .select({ count: sql`count(*)` }) - .from(userTableRows) - .where(and(...baseConditions)) - const [countResult, rows] = await Promise.all([countQuery, rowsPromise]) - totalCount = Number(countResult[0].count) - return NextResponse.json({ - success: true, - data: { - rows: rows.map((r) => ({ - id: r.id, - data: r.data, - position: r.position, - createdAt: - r.createdAt instanceof Date ? r.createdAt.toISOString() : String(r.createdAt), - updatedAt: - r.updatedAt instanceof Date ? r.updatedAt.toISOString() : String(r.updatedAt), - })), - rowCount: rows.length, - totalCount, - limit: validated.limit, - offset: validated.offset, - }, - }) - } - - const rows = await rowsPromise + const result = await queryRows( + table, + { + filter: validated.filter as Filter | undefined, + sort: validated.sort, + limit: validated.limit, + offset: validated.offset, + includeTotal: validated.includeTotal, + withExecutions: false, + }, + requestId + ) return NextResponse.json({ success: true, data: { - rows: rows.map((r) => ({ + rows: result.rows.map((r) => ({ id: r.id, data: r.data, position: r.position, createdAt: r.createdAt instanceof Date ? r.createdAt.toISOString() : String(r.createdAt), updatedAt: r.updatedAt instanceof Date ? r.updatedAt.toISOString() : String(r.updatedAt), })), - rowCount: rows.length, - totalCount, - limit: validated.limit, - offset: validated.offset, + rowCount: result.rowCount, + totalCount: result.totalCount, + limit: result.limit, + offset: result.offset, }, }) } catch (error) { diff --git a/apps/sim/lib/copilot/request/tools/tables.ts b/apps/sim/lib/copilot/request/tools/tables.ts index 772c4ba03ba..acea4c83a50 100644 --- a/apps/sim/lib/copilot/request/tools/tables.ts +++ b/apps/sim/lib/copilot/request/tools/tables.ts @@ -12,7 +12,8 @@ import { TraceEvent } from '@/lib/copilot/generated/trace-events-v1' import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1' import { withCopilotSpan } from '@/lib/copilot/request/otel' import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types' -import { getTableById } from '@/lib/table/service' +import type { RowData } from '@/lib/table' +import { buildOrderedRowValues, getTableById } from '@/lib/table/service' const logger = createLogger('CopilotToolResultTables') @@ -107,16 +108,15 @@ export async function maybeWriteOutputToTable( throw new Error('Request aborted before tool mutation could be applied') } const chunk = rows.slice(i, i + BATCH_CHUNK_SIZE) - const values = chunk.map((rowData, j) => ({ - id: `row_${generateId().replace(/-/g, '')}`, + const values = buildOrderedRowValues({ tableId: outputTable, workspaceId: context.workspaceId!, - data: rowData, - position: i + j, - createdAt: now, - updatedAt: now, + rows: chunk as RowData[], + startPosition: i, + now, createdBy: context.userId, - })) + makeId: () => `row_${generateId().replace(/-/g, '')}`, + }) await tx.insert(userTableRows).values(values) } }) @@ -251,16 +251,15 @@ export async function maybeWriteReadCsvToTable( throw new Error('Request aborted before tool mutation could be applied') } const chunk = rows.slice(i, i + BATCH_CHUNK_SIZE) - const values = chunk.map((rowData, j) => ({ - id: `row_${generateId().replace(/-/g, '')}`, + const values = buildOrderedRowValues({ tableId: outputTable, workspaceId: context.workspaceId!, - data: rowData, - position: i + j, - createdAt: now, - updatedAt: now, + rows: chunk as RowData[], + startPosition: i, + now, createdBy: context.userId, - })) + makeId: () => `row_${generateId().replace(/-/g, '')}`, + }) await tx.insert(userTableRows).values(values) } }) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 065c61d676a..964bf037b64 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -24,6 +24,7 @@ import type { DbOrTx } from '@/lib/db/types' import { materializeExecutionData } from '@/lib/logs/execution/trace-store' import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants' import { areGroupDepsSatisfied } from './deps' +import { CSV_MAX_BATCH_SIZE } from './import' import { buildFilterClause, buildSortClause } from './sql' import { fireTableTrigger } from './trigger' import type { @@ -112,22 +113,6 @@ async function setTableTxTimeouts( await trx.execute(sql.raw(`SET LOCAL idle_in_transaction_session_timeout = '${i}ms'`)) } -/** - * Serializes writers that compute `max(position) + 1` for the same table. - * - * The row-count trigger (migration 0198) serializes capacity via a row lock on - * `user_table_definitions` — but it fires AFTER INSERT, so two concurrent - * auto-positioned inserts can read the same snapshot and assign the same - * position (the `(table_id, position)` index is non-unique). This advisory - * lock restores the pre-trigger serialization scoped to a single table, with - * no cross-table contention. Released automatically at COMMIT/ROLLBACK. - */ -async function acquireTablePositionLock(trx: DbTransaction, tableId: string) { - await trx.execute( - sql`SELECT pg_advisory_xact_lock(hashtextextended(${`user_table_rows_pos:${tableId}`}, 0))` - ) -} - /** * Serializes schema/metadata read-modify-writes for a single table so * concurrent mutators can't clobber each other's `schema` JSONB @@ -161,25 +146,10 @@ async function withLockedTable( }) } -/** - * Returns the next auto-assigned `position` for a table (max(position) + 1, or 0 - * if empty). Callers must hold `acquireTablePositionLock` to avoid two concurrent - * writers computing the same value against the same snapshot. - */ -async function nextAutoPosition(trx: DbTransaction, tableId: string): Promise { - const [{ maxPos }] = await trx - .select({ - maxPos: sql`coalesce(max(${userTableRows.position}), -1)`.mapWith(Number), - }) - .from(userTableRows) - .where(eq(userTableRows.tableId, tableId)) - return maxPos + 1 -} - /** * Starting `position` for an append import — `max(position) + 1`, or 0 when empty. Read once, * unlocked, before streaming: the import worker is the table's sole writer, so it can assign - * contiguous positions from this offset without per-batch `nextAutoPosition` scans. + * contiguous positions from this offset without per-batch position scans. */ export async function nextImportStartPosition(tableId: string): Promise { const [{ maxPos }] = await db @@ -974,6 +944,319 @@ export async function restoreTable(tableId: string, requestId: string): Promise< logger.info(`[${requestId}] Restored table ${tableId} as "${attemptedRestoreName}"`) } +/** + * Loads `tableRowExecutions` rows for the given row ids and groups them into a + * `Map` suitable for plugging into `TableRow.executions`. + */ +async function loadExecutionsByRow( + trx: DbOrTx, + rowIds: Iterable +): Promise> { + const ids = Array.from(new Set(rowIds)) + const result = new Map() + if (ids.length === 0) return result + const rows = await trx + .select() + .from(tableRowExecutions) + .where(inArray(tableRowExecutions.rowId, ids)) + for (const r of rows) { + const existing = result.get(r.rowId) ?? {} + const meta: RowExecutionMetadata = { + status: r.status as RowExecutionMetadata['status'], + executionId: r.executionId ?? null, + jobId: r.jobId ?? null, + workflowId: r.workflowId, + error: r.error ?? null, + ...(r.runningBlockIds && r.runningBlockIds.length > 0 + ? { runningBlockIds: r.runningBlockIds } + : {}), + ...(r.blockErrors && Object.keys(r.blockErrors as Record).length > 0 + ? { blockErrors: r.blockErrors as Record } + : {}), + ...(r.cancelledAt ? { cancelledAt: r.cancelledAt.toISOString() } : {}), + } + existing[r.groupId] = meta + result.set(r.rowId, existing) + } + return result +} + +/** Convenience: load executions for one row, returning `{}` when missing. */ +async function loadExecutionsForRow(trx: DbOrTx, rowId: string): Promise { + const byRow = await loadExecutionsByRow(trx, [rowId]) + return byRow.get(rowId) ?? {} +} + +/** + * Serializes writers that assign `position` for the same table. The row-count + * trigger (migration 0198) serializes capacity via a row lock on + * `user_table_definitions`, but it fires AFTER INSERT, so two concurrent + * auto-positioned inserts could read the same snapshot and assign the same + * position (the `(table_id, position)` index is non-unique). This advisory lock + * restores per-table serialization. Released at COMMIT/ROLLBACK. + */ +async function acquireRowOrderLock(trx: DbTransaction, tableId: string) { + await trx.execute( + sql`SELECT pg_advisory_xact_lock(hashtextextended(${`user_table_rows_pos:${tableId}`}, 0))` + ) +} + +/** Next append position for a table (max(position) + 1, or 0 if empty). */ +async function nextRowPosition(trx: DbTransaction, tableId: string): Promise { + const [{ maxPos }] = await trx + .select({ + maxPos: sql`coalesce(max(${userTableRows.position}), -1)`.mapWith(Number), + }) + .from(userTableRows) + .where(eq(userTableRows.tableId, tableId)) + return maxPos + 1 +} + +/** Shifts every row at or after `position` up by one (`position + 1`). */ +async function shiftRowsUpFrom(trx: DbTransaction, tableId: string, position: number) { + await trx + .update(userTableRows) + .set({ position: sql`position + 1` }) + .where(and(eq(userTableRows.tableId, tableId), gte(userTableRows.position, position))) +} + +/** Shifts every row after `position` down by one (`position - 1`). */ +async function shiftRowsDownAfter(trx: DbTransaction, tableId: string, position: number) { + await trx + .update(userTableRows) + .set({ position: sql`position - 1` }) + .where(and(eq(userTableRows.tableId, tableId), gt(userTableRows.position, position))) +} + +/** + * Reserves the `position` for a single inserted row and returns where to INSERT. + * Acquires the row-order lock, then opens a slot at `requestedPosition` (shifting + * the occupant + tail up) or computes the append position. Caller runs inside a + * transaction. + */ +async function reserveInsertPosition( + trx: DbTransaction, + tableId: string, + requestedPosition?: number +): Promise { + await acquireRowOrderLock(trx, tableId) + if (requestedPosition === undefined) { + return nextRowPosition(trx, tableId) + } + const [existing] = await trx + .select({ id: userTableRows.id }) + .from(userTableRows) + .where(and(eq(userTableRows.tableId, tableId), eq(userTableRows.position, requestedPosition))) + .limit(1) + if (existing) { + await shiftRowsUpFrom(trx, tableId, requestedPosition) + } + return requestedPosition +} + +/** + * Reserves positions for a batch of `count` rows. Opens each requested slot + * (ascending, preserving prior gaps) and returns the requested positions in + * original order; otherwise returns a contiguous append range. + */ +async function reserveBatchPositions( + trx: DbTransaction, + tableId: string, + count: number, + requestedPositions?: number[] +): Promise { + await acquireRowOrderLock(trx, tableId) + if (requestedPositions && requestedPositions.length > 0) { + for (const pos of [...requestedPositions].sort((a, b) => a - b)) { + await shiftRowsUpFrom(trx, tableId, pos) + } + return requestedPositions + } + const start = await nextRowPosition(trx, tableId) + return Array.from({ length: count }, (_, i) => start + i) +} + +/** + * Recompacts row positions to be contiguous after a bulk delete. With + * `minDeletedPos`, only rows at/after it are re-numbered; single-row deletes use + * the cheaper {@link shiftRowsDownAfter}. + */ +async function compactPositions(trx: DbTransaction, tableId: string, minDeletedPos?: number) { + if (minDeletedPos === undefined) { + await trx.execute(sql` + UPDATE user_table_rows t + SET position = r.new_pos + FROM ( + SELECT id, ROW_NUMBER() OVER (ORDER BY position) - 1 AS new_pos + FROM user_table_rows + WHERE table_id = ${tableId} + ) r + WHERE t.id = r.id AND t.table_id = ${tableId} AND t.position != r.new_pos + `) + return + } + await trx.execute(sql` + UPDATE user_table_rows t + SET position = r.new_pos + FROM ( + SELECT id, ${minDeletedPos}::int + ROW_NUMBER() OVER (ORDER BY position) - 1 AS new_pos + FROM user_table_rows + WHERE table_id = ${tableId} AND position >= ${minDeletedPos} + ) r + WHERE t.id = r.id AND t.table_id = ${tableId} AND t.position != r.new_pos + `) +} + +/** A row value ready to INSERT into `user_table_rows`, with its assigned position. */ +export interface OrderedRowValue { + id: string + tableId: string + workspaceId: string + data: RowData + position: number + createdAt: Date + updatedAt: Date + createdBy?: string +} + +/** + * Builds INSERT values for a contiguous run of rows, assigning sequential + * positions `startPosition, startPosition + 1, …`. Centralizes position + * assignment for callers that write a fresh ordered run (e.g. the copilot tool's + * replace-all write). + */ +export function buildOrderedRowValues(opts: { + tableId: string + workspaceId: string + rows: RowData[] + startPosition: number + now: Date + createdBy?: string + makeId: () => string +}): OrderedRowValue[] { + const { tableId, workspaceId, rows, startPosition, now, createdBy, makeId } = opts + return rows.map((data, i) => ({ + id: makeId(), + tableId, + workspaceId, + data, + position: startPosition + i, + createdAt: now, + updatedAt: now, + ...(createdBy ? { createdBy } : {}), + })) +} + +/** + * Inserts a single row in its own transaction: sets timeouts, reserves the + * position, and inserts. Validation and side-effect dispatch stay with the + * caller. Capacity is enforced by the `increment_user_table_row_count` trigger. + */ +async function insertOrderedRow(params: { + tableId: string + workspaceId: string + data: RowData + rowId: string + position?: number + createdBy?: string + now: Date +}): Promise<{ id: string; data: RowData; position: number; createdAt: Date; updatedAt: Date }> { + const { tableId, workspaceId, data, rowId, position, createdBy, now } = params + const [row] = await db.transaction(async (trx) => { + await setTableTxTimeouts(trx) + const targetPosition = await reserveInsertPosition(trx, tableId, position) + return trx + .insert(userTableRows) + .values({ + id: rowId, + tableId, + workspaceId, + data, + position: targetPosition, + createdAt: now, + updatedAt: now, + ...(createdBy ? { createdBy } : {}), + }) + .returning() + }) + return { + id: row.id, + data: row.data as RowData, + position: row.position, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + } +} + +/** + * Deletes a single row by id in its own transaction, then closes the positional + * gap. Returns `false` when no row matched. + */ +async function deleteOrderedRow(params: { + tableId: string + rowId: string + workspaceId: string +}): Promise { + const { tableId, rowId, workspaceId } = params + return db.transaction(async (trx) => { + await setTableTxTimeouts(trx) + const [deleted] = await trx + .delete(userTableRows) + .where( + and( + eq(userTableRows.id, rowId), + eq(userTableRows.tableId, tableId), + eq(userTableRows.workspaceId, workspaceId) + ) + ) + .returning({ position: userTableRows.position }) + if (!deleted) return false + await shiftRowsDownAfter(trx, tableId, deleted.position) + return true + }) +} + +/** + * Deletes the given row ids in batches within one transaction, then recompacts + * positions from the earliest deleted slot. Returns the deleted rows (id + prior + * position). The caller resolves which ids to delete (used by both delete-by-ids + * and delete-by-filter). + */ +async function deleteOrderedRowsByIds(params: { + tableId: string + workspaceId: string + rowIds: string[] +}): Promise<{ id: string; position: number }[]> { + const { tableId, workspaceId, rowIds } = params + if (rowIds.length === 0) return [] + return db.transaction(async (trx) => { + await setTableTxTimeouts(trx, { statementMs: 60_000 }) + const deleted: { id: string; position: number }[] = [] + for (let i = 0; i < rowIds.length; i += TABLE_LIMITS.DELETE_BATCH_SIZE) { + const batch = rowIds.slice(i, i + TABLE_LIMITS.DELETE_BATCH_SIZE) + const rows = await trx + .delete(userTableRows) + .where( + and( + eq(userTableRows.tableId, tableId), + eq(userTableRows.workspaceId, workspaceId), + inArray(userTableRows.id, batch) + ) + ) + .returning({ id: userTableRows.id, position: userTableRows.position }) + deleted.push(...rows) + } + if (deleted.length > 0) { + const minDeletedPos = deleted.reduce( + (min, r) => (r.position < min ? r.position : min), + deleted[0].position + ) + await compactPositions(trx, tableId, minDeletedPos) + } + return deleted + }) +} + /** * Inserts a single row into a table. * @@ -1016,57 +1299,14 @@ export async function insertRow( // (migration 0198): a single conditional UPDATE on user_table_definitions // increments row_count iff row_count < max_rows, taking the row lock // atomically. No app-level FOR UPDATE / COUNT needed. - const [row] = await db.transaction(async (trx) => { - await setTableTxTimeouts(trx) - - let targetPosition: number - - // The `(table_id, position)` index is non-unique, so we serialize all - // position-aware writes (explicit and auto) through the per-table - // advisory lock. Without this, two concurrent explicit-position inserts - // at the same position can both observe an empty slot, both skip the - // shift, and each INSERT a row with a duplicate `(table_id, position)`. - await acquireTablePositionLock(trx, data.tableId) - - if (data.position !== undefined) { - targetPosition = data.position - - const [existing] = await trx - .select({ id: userTableRows.id }) - .from(userTableRows) - .where( - and(eq(userTableRows.tableId, data.tableId), eq(userTableRows.position, targetPosition)) - ) - .limit(1) - - if (existing) { - await trx - .update(userTableRows) - .set({ position: sql`position + 1` }) - .where( - and( - eq(userTableRows.tableId, data.tableId), - gte(userTableRows.position, targetPosition) - ) - ) - } - } else { - targetPosition = await nextAutoPosition(trx, data.tableId) - } - - return trx - .insert(userTableRows) - .values({ - id: rowId, - tableId: data.tableId, - workspaceId: data.workspaceId, - data: data.data, - position: targetPosition, - createdAt: now, - updatedAt: now, - ...(data.userId ? { createdBy: data.userId } : {}), - }) - .returning() + const row = await insertOrderedRow({ + tableId: data.tableId, + workspaceId: data.workspaceId, + data: data.data, + rowId, + position: data.position, + createdBy: data.userId, + now, }) logger.info(`[${requestId}] Inserted row ${rowId} into table ${data.tableId}`) @@ -1181,28 +1421,9 @@ export async function batchInsertRowsWithTx( ...(data.userId ? { createdBy: data.userId } : {}), }) - await acquireTablePositionLock(trx, data.tableId) - - let insertedRows - if (data.positions && data.positions.length > 0) { - // Position-aware insert: shift existing rows to create gaps, then insert. - // Process positions ascending so each shift preserves gaps created by prior shifts. - const sortedPositions = [...data.positions].sort((a, b) => a - b) - - for (const pos of sortedPositions) { - await trx - .update(userTableRows) - .set({ position: sql`position + 1` }) - .where(and(eq(userTableRows.tableId, data.tableId), gte(userTableRows.position, pos))) - } - - const rowsToInsert = data.rows.map((rowData, i) => buildRow(rowData, data.positions![i])) - insertedRows = await trx.insert(userTableRows).values(rowsToInsert).returning() - } else { - const startPos = await nextAutoPosition(trx, data.tableId) - const rowsToInsert = data.rows.map((rowData, i) => buildRow(rowData, startPos + i)) - insertedRows = await trx.insert(userTableRows).values(rowsToInsert).returning() - } + const positions = await reserveBatchPositions(trx, data.tableId, data.rows.length, data.positions) + const rowsToInsert = data.rows.map((rowData, i) => buildRow(rowData, positions[i])) + const insertedRows = await trx.insert(userTableRows).values(rowsToInsert).returning() logger.info(`[${requestId}] Batch inserted ${data.rows.length} rows into table ${data.tableId}`) @@ -1567,7 +1788,7 @@ export async function replaceTableRowsWithTx( // snapshot for the DELETE; the second's DELETE would not observe rows the // first inserted, so both transactions commit and the table ends up with // the union of both row sets instead of only the last caller's rows. - await acquireTablePositionLock(trx, data.tableId) + await acquireRowOrderLock(trx, data.tableId) const deletedRows = await trx .delete(userTableRows) @@ -1604,6 +1825,62 @@ export async function replaceTableRowsWithTx( return { deletedCount: deletedRows.length, insertedCount } } +/** + * Owns the append-import transaction so the API route never holds a `trx`: + * optionally creates the new columns, then inserts every row in CSV-sized + * batches — all atomic. Caller fires {@link dispatchAfterBatchInsert} after this + * resolves (post-commit), mirroring the other batch-insert sites. + */ +export async function importAppendRows( + table: TableDefinition, + additions: { name: string; type: string; required?: boolean; unique?: boolean }[], + rows: RowData[], + ctx: { workspaceId: string; userId?: string; requestId: string } +): Promise<{ inserted: TableRow[]; table: TableDefinition }> { + return db.transaction(async (trx) => { + let working = table + if (additions.length > 0) { + working = await addTableColumnsWithTx(trx, table, additions, ctx.requestId) + } + const inserted: TableRow[] = [] + for (let i = 0; i < rows.length; i += CSV_MAX_BATCH_SIZE) { + const batch = rows.slice(i, i + CSV_MAX_BATCH_SIZE) + const batchInserted = await batchInsertRowsWithTx( + trx, + { tableId: working.id, rows: batch, workspaceId: ctx.workspaceId, userId: ctx.userId }, + working, + generateId().slice(0, 8) + ) + inserted.push(...batchInserted) + } + return { inserted, table: working } + }) +} + +/** + * Owns the replace-import transaction: optionally creates the new columns, then + * replaces all rows — atomically. Keeps `trx` out of the API route. + */ +export async function importReplaceRows( + table: TableDefinition, + additions: { name: string; type: string; required?: boolean; unique?: boolean }[], + data: { rows: RowData[]; workspaceId: string; userId?: string }, + requestId: string +): Promise { + return db.transaction(async (trx) => { + let working = table + if (additions.length > 0) { + working = await addTableColumnsWithTx(trx, table, additions, requestId) + } + return replaceTableRowsWithTx( + trx, + { tableId: working.id, rows: data.rows, workspaceId: data.workspaceId, userId: data.userId }, + working, + requestId + ) + }) +} + /** * Upserts a row: updates an existing row if a match is found on the conflict target * column, otherwise inserts a new row. @@ -1716,7 +1993,7 @@ export async function upsertRow( let matchedRowId = existingRow?.id let previousData = existingRow?.data as RowData | undefined if (!matchedRowId) { - await acquireTablePositionLock(trx, data.tableId) + await acquireRowOrderLock(trx, data.tableId) const [racedRow] = await trx .select({ id: userTableRows.id, data: userTableRows.data }) .from(userTableRows) @@ -1763,7 +2040,7 @@ export async function upsertRow( tableId: data.tableId, workspaceId: data.workspaceId, data: data.data, - position: await nextAutoPosition(trx, data.tableId), + position: await reserveInsertPosition(trx, data.tableId), createdAt: now, updatedAt: now, ...(data.userId ? { createdBy: data.userId } : {}), @@ -1848,12 +2125,12 @@ export async function queryRows( limit = TABLE_LIMITS.DEFAULT_QUERY_LIMIT, offset = 0, includeTotal = true, + withExecutions = true, } = options const tableName = USER_TABLE_ROWS_SQL_NAME const columns = table.schema.columns - // Build WHERE clause const baseConditions = and( eq(userTableRows.tableId, table.id), eq(userTableRows.workspaceId, table.workspaceId) @@ -1867,49 +2144,50 @@ export async function queryRows( } } - let totalCount: number | null = null - if (includeTotal) { - const countResult = await db - .select({ count: count() }) - .from(userTableRows) - .where(whereClause ?? baseConditions) - totalCount = Number(countResult[0].count) - } - - // Build ORDER BY clause (default to position ASC for stable ordering) let orderByClause if (sort && Object.keys(sort).length > 0) { orderByClause = buildSortClause(sort, tableName, columns) } - // Execute query let query = db .select() .from(userTableRows) .where(whereClause ?? baseConditions) - if (orderByClause) { query = query.orderBy(orderByClause) as typeof query } else { query = query.orderBy(userTableRows.position) as typeof query } - const rows = await query.limit(limit).offset(offset) + // Count and page fetch are independent reads — run them concurrently so the + // `includeTotal` hot path doesn't pay two serial round-trips. + const rowsPromise = query.limit(limit).offset(offset) + const countPromise = includeTotal + ? db + .select({ count: count() }) + .from(userTableRows) + .where(whereClause ?? baseConditions) + : null + + const [rows, countResult] = await Promise.all([rowsPromise, countPromise]) + const totalCount = countResult ? Number(countResult[0].count) : null + + const executionsByRow = withExecutions + ? await loadExecutionsByRow( + db, + rows.map((r) => r.id) + ) + : null logger.info( `[${requestId}] Queried ${rows.length} rows from table ${table.id} (total: ${totalCount})` ) - const executionsByRow = await loadExecutionsByRow( - db, - rows.map((r) => r.id) - ) - return { rows: rows.map((r) => ({ id: r.id, data: r.data as RowData, - executions: executionsByRow.get(r.id) ?? {}, + executions: executionsByRow?.get(r.id) ?? {}, position: r.position, createdAt: r.createdAt, updatedAt: r.updatedAt, @@ -2076,50 +2354,6 @@ function applyExecutionsPatch( return next } -/** - * Loads `tableRowExecutions` rows for the given row ids and groups them into - * a `Map` suitable for plugging into `TableRow.executions` - * everywhere callers used to read `userTableRows.executions` JSONB. - */ -async function loadExecutionsByRow( - trx: DbOrTx, - rowIds: Iterable -): Promise> { - const ids = Array.from(new Set(rowIds)) - const result = new Map() - if (ids.length === 0) return result - const rows = await trx - .select() - .from(tableRowExecutions) - .where(inArray(tableRowExecutions.rowId, ids)) - for (const r of rows) { - const existing = result.get(r.rowId) ?? {} - const meta: RowExecutionMetadata = { - status: r.status as RowExecutionMetadata['status'], - executionId: r.executionId ?? null, - jobId: r.jobId ?? null, - workflowId: r.workflowId, - error: r.error ?? null, - ...(r.runningBlockIds && r.runningBlockIds.length > 0 - ? { runningBlockIds: r.runningBlockIds } - : {}), - ...(r.blockErrors && Object.keys(r.blockErrors as Record).length > 0 - ? { blockErrors: r.blockErrors as Record } - : {}), - ...(r.cancelledAt ? { cancelledAt: r.cancelledAt.toISOString() } : {}), - } - existing[r.groupId] = meta - result.set(r.rowId, existing) - } - return result -} - -/** Convenience: load executions for one row, returning `{}` when missing. */ -async function loadExecutionsForRow(trx: DbOrTx, rowId: string): Promise { - const byRow = await loadExecutionsByRow(trx, [rowId]) - return byRow.get(rowId) ?? {} -} - /** * Writes a per-group execution patch for one row against the `tableRowExecutions` * sidecar. Non-null values upsert into the table; nulls delete the entry. When @@ -2439,26 +2673,8 @@ export async function deleteRow( workspaceId: string, requestId: string ): Promise { - await db.transaction(async (trx) => { - await setTableTxTimeouts(trx) - const [deleted] = await trx - .delete(userTableRows) - .where( - and( - eq(userTableRows.id, rowId), - eq(userTableRows.tableId, tableId), - eq(userTableRows.workspaceId, workspaceId) - ) - ) - .returning({ position: userTableRows.position }) - - if (!deleted) throw new Error('Row not found') - - await trx - .update(userTableRows) - .set({ position: sql`position - 1` }) - .where(and(eq(userTableRows.tableId, tableId), gt(userTableRows.position, deleted.position))) - }) + const deleted = await deleteOrderedRow({ tableId, rowId, workspaceId }) + if (!deleted) throw new Error('Row not found') logger.info(`[${requestId}] Deleted row ${rowId} from table ${tableId}`) } @@ -2799,43 +3015,6 @@ export async function batchUpdateRows( } } -/** - * Recompacts row positions to be contiguous after batch deletions. - * - * When `minDeletedPos` is provided, only rows with `position >= minDeletedPos` - * are re-numbered (starting from `minDeletedPos`). Rows before the earliest - * deleted position are untouched since their position is unaffected. - * - * If `minDeletedPos` is omitted, the whole table is recompacted from 0. - * Single-row deletes use the more efficient `position - 1` shift in {@link deleteRow}. - */ -async function recompactPositions(tableId: string, trx: DbTransaction, minDeletedPos?: number) { - if (minDeletedPos === undefined) { - await trx.execute(sql` - UPDATE user_table_rows t - SET position = r.new_pos - FROM ( - SELECT id, ROW_NUMBER() OVER (ORDER BY position) - 1 AS new_pos - FROM user_table_rows - WHERE table_id = ${tableId} - ) r - WHERE t.id = r.id AND t.table_id = ${tableId} AND t.position != r.new_pos - `) - return - } - - await trx.execute(sql` - UPDATE user_table_rows t - SET position = r.new_pos - FROM ( - SELECT id, ${minDeletedPos}::int + ROW_NUMBER() OVER (ORDER BY position) - 1 AS new_pos - FROM user_table_rows - WHERE table_id = ${tableId} AND position >= ${minDeletedPos} - ) r - WHERE t.id = r.id AND t.table_id = ${tableId} AND t.position != r.new_pos - `) -} - /** * Deletes multiple rows matching a filter. * @@ -2879,28 +3058,11 @@ export async function deleteRowsByFilter( } const rowIds = matchingRows.map((r) => r.id) - const minDeletedPos = matchingRows.reduce( - (min, r) => (r.position < min ? r.position : min), - matchingRows[0].position - ) - - await db.transaction(async (trx) => { - await setTableTxTimeouts(trx, { statementMs: 60_000 }) - for (let i = 0; i < rowIds.length; i += TABLE_LIMITS.DELETE_BATCH_SIZE) { - const batch = rowIds.slice(i, i + TABLE_LIMITS.DELETE_BATCH_SIZE) - await trx.delete(userTableRows).where( - and( - eq(userTableRows.tableId, table.id), - eq(userTableRows.workspaceId, table.workspaceId), - sql`${userTableRows.id} = ANY(ARRAY[${sql.join( - batch.map((id) => sql`${id}`), - sql`, ` - )}])` - ) - ) - } - await recompactPositions(table.id, trx, minDeletedPos) + await deleteOrderedRowsByIds({ + tableId: table.id, + workspaceId: table.workspaceId, + rowIds, }) logger.info(`[${requestId}] Deleted ${matchingRows.length} rows from table ${table.id}`) @@ -2924,36 +3086,10 @@ export async function deleteRowsByIds( ): Promise { const uniqueRequestedRowIds = Array.from(new Set(data.rowIds)) - const deletedRows = await db.transaction(async (trx) => { - await setTableTxTimeouts(trx, { statementMs: 60_000 }) - const deleted: { id: string; position: number }[] = [] - for (let i = 0; i < uniqueRequestedRowIds.length; i += TABLE_LIMITS.DELETE_BATCH_SIZE) { - const batch = uniqueRequestedRowIds.slice(i, i + TABLE_LIMITS.DELETE_BATCH_SIZE) - const rows = await trx - .delete(userTableRows) - .where( - and( - eq(userTableRows.tableId, data.tableId), - eq(userTableRows.workspaceId, data.workspaceId), - sql`${userTableRows.id} = ANY(ARRAY[${sql.join( - batch.map((id) => sql`${id}`), - sql`, ` - )}])` - ) - ) - .returning({ id: userTableRows.id, position: userTableRows.position }) - deleted.push(...rows) - } - - if (deleted.length > 0) { - const minDeletedPos = deleted.reduce( - (min, r) => (r.position < min ? r.position : min), - deleted[0].position - ) - await recompactPositions(data.tableId, trx, minDeletedPos) - } - - return deleted + const deletedRows = await deleteOrderedRowsByIds({ + tableId: data.tableId, + workspaceId: data.workspaceId, + rowIds: uniqueRequestedRowIds, }) const deletedIds = deletedRows.map((r) => r.id) diff --git a/apps/sim/lib/table/types.ts b/apps/sim/lib/table/types.ts index a20789a8481..63f56e292f5 100644 --- a/apps/sim/lib/table/types.ts +++ b/apps/sim/lib/table/types.ts @@ -278,6 +278,12 @@ export interface QueryOptions { * is returned as `null` to signal it was not computed. */ includeTotal?: boolean + /** + * When true (default), each returned row's `executions` is populated from the + * `tableRowExecutions` sidecar. Pass `false` to skip the join and return `{}` + * (the public v1 route does not expose executions). + */ + withExecutions?: boolean } export interface QueryResult {