diff --git a/packages/blocks/openops-tables/src/actions/update-records-batch-action.ts b/packages/blocks/openops-tables/src/actions/update-records-batch-action.ts new file mode 100644 index 000000000..eed2cda3d --- /dev/null +++ b/packages/blocks/openops-tables/src/actions/update-records-batch-action.ts @@ -0,0 +1,80 @@ +import { BlockAuth, createAction, Property } from '@openops/blocks-framework'; +import { + batchUpdateRows, + getFields, + getTableIdByTableName, + OpenOpsField, + openopsTablesDropdownProperty, + resolveTokenProvider, + TokenOrResolver, +} from '@openops/common'; +import { cacheWrapper } from '@openops/server-shared'; + +export const updateRecordsBatchAction = createAction({ + auth: BlockAuth.None(), + name: 'update_records_batch', + description: 'Update multiple existing records in an OpenOps table.', + displayName: 'Update Records Batch', + isWriteAction: true, + props: { + tableName: openopsTablesDropdownProperty(), + items: Property.Json({ + displayName: 'Items', + required: true, + description: + 'An array of objects with rowId (Baserow internal row ID as integer) and fields keyed by table field names. Example: [{ rowId: 123, fields: { Owner: "user@example.com" } }]', + }), + }, + async run(context) { + const tableName = context.propsValue.tableName as unknown as string; + const items = context.propsValue.items as unknown; + + if (!Array.isArray(items)) { + throw new Error( + 'Items must be an array of objects with rowId and fields.', + ); + } + + if ( + items.some( + (item) => + item === null || + typeof item !== 'object' || + Array.isArray(item) || + typeof item.rowId !== 'number' || + !Number.isInteger(item.rowId) || + item.fields === null || + typeof item.fields !== 'object' || + Array.isArray(item.fields), + ) + ) { + throw new Error( + 'Each item must include an integer rowId and an object fields value.', + ); + } + + const tableCacheKey = `${context.run.id}-table-${tableName}`; + const tableId = await cacheWrapper.getOrAdd( + tableCacheKey, + getTableIdByTableName, + [tableName, context.server], + ); + + const tokenOrResolver = await resolveTokenProvider(context.server); + const fieldsCacheKey = `${context.run.id}-${tableId}-fields`; + await cacheWrapper.getOrAdd( + fieldsCacheKey, + getFields, + [tableId, tokenOrResolver], + ); + + return await batchUpdateRows({ + tableId, + tokenOrResolver, + items: items as { + rowId: number; + fields: { [key: string]: any }; + }[], + }); + }, +}); diff --git a/packages/blocks/openops-tables/src/index.ts b/packages/blocks/openops-tables/src/index.ts index 3828b33c8..8f5b72270 100644 --- a/packages/blocks/openops-tables/src/index.ts +++ b/packages/blocks/openops-tables/src/index.ts @@ -5,6 +5,7 @@ import { deleteRecordAction } from './actions/delete-record-action'; import { getRecordsAction } from './actions/get-records-action'; import { getTableUrlAction } from './actions/get-table-url-action'; import { updateRecordAction } from './actions/update-record-action'; +import { updateRecordsBatchAction } from './actions/update-records-batch-action'; export const openopsTables = createBlock({ displayName: 'OpenOps Tables', @@ -17,6 +18,7 @@ export const openopsTables = createBlock({ getRecordsAction, createRecordsBatchAction, updateRecordAction, + updateRecordsBatchAction, deleteRecordAction, getTableUrlAction, ], diff --git a/packages/blocks/openops-tables/test/actions/update-records-batch-action.test.ts b/packages/blocks/openops-tables/test/actions/update-records-batch-action.test.ts new file mode 100644 index 000000000..e5ce6fba2 --- /dev/null +++ b/packages/blocks/openops-tables/test/actions/update-records-batch-action.test.ts @@ -0,0 +1,153 @@ +const cacheWrapperMock = { + getSerializedObject: jest.fn(), + setSerializedObject: jest.fn(), + getOrAdd: jest.fn(), +}; + +jest.mock('@openops/server-shared', () => ({ + ...jest.requireActual('@openops/server-shared'), + cacheWrapper: cacheWrapperMock, +})); + +const openopsCommonMock = { + ...jest.requireActual('@openops/common'), + batchUpdateRows: jest.fn(), + getFields: jest.fn(), + getTableIdByTableName: jest.fn(), + openopsTablesDropdownProperty: jest.fn().mockReturnValue({ + required: true, + defaultValue: false, + type: 'DROPDOWN', + }), + resolveTokenProvider: jest.fn(async (serverContext) => { + return { + getToken: () => serverContext.tablesDatabaseToken, + }; + }), +}; + +jest.mock('@openops/common', () => openopsCommonMock); + +import { getFields, getTableIdByTableName } from '@openops/common'; +import { nanoid } from 'nanoid'; +import { updateRecordsBatchAction } from '../../src/actions/update-records-batch-action'; + +describe('updateRecordsBatchAction', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + test('should create action with correct properties', () => { + expect(Object.keys(updateRecordsBatchAction.props).length).toBe(2); + expect(updateRecordsBatchAction.props).toMatchObject({ + tableName: { + required: true, + type: 'DROPDOWN', + }, + items: { + required: true, + type: 'JSON', + }, + }); + }); + + test('should resolve table metadata and batch update rows', async () => { + cacheWrapperMock.getOrAdd + .mockResolvedValueOnce(1) + .mockResolvedValueOnce([{ name: 'ID', primary: true }]); + openopsCommonMock.batchUpdateRows.mockResolvedValue([{ id: 101 }]); + + const context = createContext({ + items: [ + { + rowId: 1, + fields: { Owner: 'leyla@openops.com' }, + }, + ], + }); + + const result = await updateRecordsBatchAction.run(context); + + expect(cacheWrapperMock.getOrAdd).toHaveBeenNthCalledWith( + 1, + `${context.run.id}-table-${context.propsValue.tableName}`, + getTableIdByTableName, + [context.propsValue.tableName, context.server], + ); + expect(cacheWrapperMock.getOrAdd).toHaveBeenNthCalledWith( + 2, + `${context.run.id}-1-fields`, + getFields, + [ + 1, + expect.objectContaining({ + getToken: expect.any(Function), + }), + ], + ); + expect(openopsCommonMock.resolveTokenProvider).toHaveBeenCalledWith( + context.server, + ); + expect(openopsCommonMock.batchUpdateRows).toHaveBeenCalledWith({ + tableId: 1, + tokenOrResolver: expect.objectContaining({ + getToken: expect.any(Function), + }), + items: [ + { + rowId: 1, + fields: { Owner: 'leyla@openops.com' }, + }, + ], + }); + expect(result).toEqual([{ id: 101 }]); + }); + + test('should reject non-array items', async () => { + const context = createContext({ + items: { rowId: 1, fields: { Owner: 'a@b.com' } }, + }); + + await expect(updateRecordsBatchAction.run(context)).rejects.toThrow( + 'Items must be an array of objects with rowId and fields.', + ); + + expect(cacheWrapperMock.getOrAdd).not.toHaveBeenCalled(); + expect(openopsCommonMock.batchUpdateRows).not.toHaveBeenCalled(); + }); + + test('should reject invalid batch update items', async () => { + const context = createContext({ + items: [ + { + rowId: '1', + fields: { Owner: 'a@b.com' }, + }, + ], + }); + + await expect(updateRecordsBatchAction.run(context)).rejects.toThrow( + 'Each item must include an integer rowId and an object fields value.', + ); + + expect(cacheWrapperMock.getOrAdd).not.toHaveBeenCalled(); + expect(openopsCommonMock.batchUpdateRows).not.toHaveBeenCalled(); + }); +}); + +function createContext(params?: { tableName?: string; items?: unknown }) { + return { + ...jest.requireActual('@openops/blocks-framework'), + propsValue: { + tableName: params?.tableName ?? 'Opportunity', + items: params?.items ?? [], + }, + server: { + tablesDatabaseId: 1, + tablesDatabaseToken: 'token', + }, + run: { + id: nanoid(), + }, + }; +} diff --git a/packages/blocks/openops-tables/test/index.test.ts b/packages/blocks/openops-tables/test/index.test.ts index b80cfbe67..a17ba02e8 100644 --- a/packages/blocks/openops-tables/test/index.test.ts +++ b/packages/blocks/openops-tables/test/index.test.ts @@ -2,12 +2,16 @@ import { openopsTables } from '../src/index'; describe('block declaration tests', () => { test('should return block with correct number of actions', () => { - expect(Object.keys(openopsTables.actions()).length).toBe(5); + expect(Object.keys(openopsTables.actions()).length).toBe(6); expect(openopsTables.actions()).toMatchObject({ update_record: { name: 'update_record', requireAuth: true, }, + update_records_batch: { + name: 'update_records_batch', + requireAuth: true, + }, get_records: { name: 'get_records', requireAuth: true, diff --git a/packages/openops/src/lib/openops-tables/rows.ts b/packages/openops/src/lib/openops-tables/rows.ts index a3b6bc0fb..9df8b942a 100644 --- a/packages/openops/src/lib/openops-tables/rows.ts +++ b/packages/openops/src/lib/openops-tables/rows.ts @@ -42,6 +42,13 @@ export interface BatchCreateRowsParams extends RowParams { items: { [key: string]: any }[]; } +export interface BatchUpdateRowsParams extends RowParams { + items: { + rowId: number; + fields: { [key: string]: any }; + }[]; +} + export interface UpsertRowParams extends RowParams { fields: { [key: string]: any }; } @@ -69,7 +76,7 @@ class TablesAccessSemaphore { } const semaphore = TablesAccessSemaphore.getInstance(); -const MAX_BATCH_CREATE_ROWS = 200; +const MAX_BATCH_ROWS = 200; async function executeWithConcurrencyLimit( fn: () => Promise, @@ -226,11 +233,11 @@ export async function batchCreateRows( for ( let index = 0; index < batchCreateRowsParams.items.length; - index += MAX_BATCH_CREATE_ROWS + index += MAX_BATCH_ROWS ) { const items = batchCreateRowsParams.items.slice( index, - index + MAX_BATCH_CREATE_ROWS, + index + MAX_BATCH_ROWS, ); const response = await makeOpenOpsTablesPost( @@ -257,6 +264,58 @@ export async function batchCreateRows( ); } +export async function batchUpdateRows( + batchUpdateRowsParams: BatchUpdateRowsParams, +) { + if (batchUpdateRowsParams.items.length === 0) { + return []; + } + + const url = `api/database/rows/table/${batchUpdateRowsParams.tableId}/batch/?user_field_names=true`; + + return executeWithConcurrencyLimit( + async () => { + const authenticationHeader = createAxiosHeaders( + batchUpdateRowsParams.tokenOrResolver, + ); + const results = []; + + for ( + let index = 0; + index < batchUpdateRowsParams.items.length; + index += MAX_BATCH_ROWS + ) { + const items = batchUpdateRowsParams.items + .slice(index, index + MAX_BATCH_ROWS) + .map(({ rowId, fields }) => ({ + id: rowId, + ...fields, + })); + + const response = await makeOpenOpsTablesPatch( + url, + { items }, + authenticationHeader, + ); + if (Array.isArray(response)) { + results.push(...response); + } else if (response != null) { + results.push(response); + } + } + + return results; + }, + (error) => { + logger.error('Error while batch updating rows:', { + error, + url, + itemsCount: batchUpdateRowsParams.items.length, + }); + }, + ); +} + export async function deleteRow(deleteRowParams: DeleteRowParams) { const url = `api/database/rows/table/${deleteRowParams.tableId}/${deleteRowParams.rowId}/`; diff --git a/packages/openops/test/openops-tables/rows.test.ts b/packages/openops/test/openops-tables/rows.test.ts index 4237b1743..0e07c6c25 100644 --- a/packages/openops/test/openops-tables/rows.test.ts +++ b/packages/openops/test/openops-tables/rows.test.ts @@ -1,6 +1,7 @@ const makeOpenOpsTablesGetMock = jest.fn(); const makeOpenOpsTablesPatchMock = jest.fn(); const makeOpenOpsTablesPostMock = jest.fn(); +const makeOpenOpsTablesPutMock = jest.fn(); const makeOpenOpsTablesDeleteMock = jest.fn(); const createAxiosHeadersMock = jest.fn(); @@ -13,6 +14,7 @@ jest.mock('../../src/lib/openops-tables/requests-helpers', () => { makeOpenOpsTablesGet: makeOpenOpsTablesGetMock, makeOpenOpsTablesPatch: makeOpenOpsTablesPatchMock, makeOpenOpsTablesPost: makeOpenOpsTablesPostMock, + makeOpenOpsTablesPut: makeOpenOpsTablesPutMock, makeOpenOpsTablesDelete: makeOpenOpsTablesDeleteMock, }; }); @@ -57,6 +59,7 @@ import { batchCreateRows, batchDeleteRows, batchTableAggregations, + batchUpdateRows, deleteRow, getRowByPrimaryKeyValue, getRows, @@ -330,6 +333,83 @@ describe('batchCreateRows', () => { }); }); +describe('batchUpdateRows', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + test('Should split batch update requests into chunks of 200', async () => { + const items = Array.from({ length: 450 }, (_, index) => ({ + rowId: index + 1, + fields: { + Owner: `owner-${index + 1}@openops.com`, + }, + })); + + makeOpenOpsTablesPatchMock + .mockResolvedValueOnce([{ id: 1 }, { id: 2 }]) + .mockResolvedValueOnce([{ id: 3 }]) + .mockResolvedValueOnce([{ id: 4 }]); + createAxiosHeadersMock.mockReturnValue('some header'); + + const result = await batchUpdateRows({ + tableId: 1, + tokenOrResolver: 'token', + items, + }); + + expect(result).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }, { id: 4 }]); + expect(acquireMock).toBeCalledTimes(1); + expect(releaseMock).toBeCalledTimes(1); + expect(createAxiosHeadersMock).toHaveBeenCalledWith('token'); + expect(makeOpenOpsTablesPatchMock).toHaveBeenCalledTimes(3); + expect(makeOpenOpsTablesPatchMock).toHaveBeenNthCalledWith( + 1, + 'api/database/rows/table/1/batch/?user_field_names=true', + { + items: items.slice(0, 200).map((item) => ({ + id: item.rowId, + ...item.fields, + })), + }, + 'some header', + ); + expect(makeOpenOpsTablesPatchMock).toHaveBeenNthCalledWith( + 2, + 'api/database/rows/table/1/batch/?user_field_names=true', + { + items: items.slice(200, 400).map((item) => ({ + id: item.rowId, + ...item.fields, + })), + }, + 'some header', + ); + expect(makeOpenOpsTablesPatchMock).toHaveBeenNthCalledWith( + 3, + 'api/database/rows/table/1/batch/?user_field_names=true', + { + items: items.slice(400, 450).map((item) => ({ + id: item.rowId, + ...item.fields, + })), + }, + 'some header', + ); + }); + + test('Should short-circuit empty batch updates', async () => { + const result = await batchUpdateRows({ + tableId: 1, + tokenOrResolver: 'token', + items: [], + }); + + expect(result).toStrictEqual([]); + expect(makeOpenOpsTablesPatchMock).not.toHaveBeenCalled(); + }); +}); + describe('delete row', () => { beforeEach(() => { jest.clearAllMocks();