diff --git a/services/platform/convex/_generated/api.d.ts b/services/platform/convex/_generated/api.d.ts index 987b1b499..0537e75b7 100644 --- a/services/platform/convex/_generated/api.d.ts +++ b/services/platform/convex/_generated/api.d.ts @@ -295,6 +295,8 @@ import type * as model_integrations_get_decrypted_credentials from "../model/int import type * as model_integrations_get_integration from "../model/integrations/get_integration.js"; import type * as model_integrations_get_integration_by_name from "../model/integrations/get_integration_by_name.js"; import type * as model_integrations_get_workflows_for_integration from "../model/integrations/get_workflows_for_integration.js"; +import type * as model_integrations_guards_is_rest_api_integration from "../model/integrations/guards/is_rest_api_integration.js"; +import type * as model_integrations_guards_is_sql_integration from "../model/integrations/guards/is_sql_integration.js"; import type * as model_integrations_index from "../model/integrations/index.js"; import type * as model_integrations_list_integrations from "../model/integrations/list_integrations.js"; import type * as model_integrations_run_health_check from "../model/integrations/run_health_check.js"; @@ -306,6 +308,7 @@ import type * as model_integrations_types from "../model/integrations/types.js"; import type * as model_integrations_update_integration_internal from "../model/integrations/update_integration_internal.js"; import type * as model_integrations_update_integration_logic from "../model/integrations/update_integration_logic.js"; import type * as model_integrations_update_sync_stats from "../model/integrations/update_sync_stats.js"; +import type * as model_integrations_utils_get_integration_type from "../model/integrations/utils/get_integration_type.js"; import type * as model_integrations_validators from "../model/integrations/validators.js"; import type * as model_members_index from "../model/members/index.js"; import type * as model_members_validators from "../model/members/validators.js"; @@ -1014,6 +1017,8 @@ declare const fullApi: ApiFromModules<{ "model/integrations/get_integration": typeof model_integrations_get_integration; "model/integrations/get_integration_by_name": typeof model_integrations_get_integration_by_name; "model/integrations/get_workflows_for_integration": typeof model_integrations_get_workflows_for_integration; + "model/integrations/guards/is_rest_api_integration": typeof model_integrations_guards_is_rest_api_integration; + "model/integrations/guards/is_sql_integration": typeof model_integrations_guards_is_sql_integration; "model/integrations/index": typeof model_integrations_index; "model/integrations/list_integrations": typeof model_integrations_list_integrations; "model/integrations/run_health_check": typeof model_integrations_run_health_check; @@ -1025,6 +1030,7 @@ declare const fullApi: ApiFromModules<{ "model/integrations/update_integration_internal": typeof model_integrations_update_integration_internal; "model/integrations/update_integration_logic": typeof model_integrations_update_integration_logic; "model/integrations/update_sync_stats": typeof model_integrations_update_sync_stats; + "model/integrations/utils/get_integration_type": typeof model_integrations_utils_get_integration_type; "model/integrations/validators": typeof model_integrations_validators; "model/members/index": typeof model_members_index; "model/members/validators": typeof model_members_validators; diff --git a/services/platform/convex/agent_tools/integrations/execute_batch_integration_internal.ts b/services/platform/convex/agent_tools/integrations/execute_batch_integration_internal.ts index 5054db9dc..191918600 100644 --- a/services/platform/convex/agent_tools/integrations/execute_batch_integration_internal.ts +++ b/services/platform/convex/agent_tools/integrations/execute_batch_integration_internal.ts @@ -8,7 +8,6 @@ import { internalAction, type ActionCtx } from '../../_generated/server'; import { v } from 'convex/values'; -import type { Doc } from '../../_generated/dataModel'; import { internal } from '../../_generated/api'; import type { SqlExecutionResult } from '../../node_only/sql/types'; import { isIntrospectionOperation } from '../../workflow/actions/integration/helpers/is_introspection_operation'; @@ -16,24 +15,37 @@ import { getIntrospectTablesQuery } from '../../workflow/actions/integration/hel import { getIntrospectColumnsQuery } from '../../workflow/actions/integration/helpers/get_introspect_columns_query'; import { getIntrospectionOperations } from '../../workflow/actions/integration/helpers/get_introspection_operations'; import { decryptSqlCredentials } from '../../workflow/actions/integration/helpers/decrypt_sql_credentials'; -import { requiresApproval, getOperationType, type OperationConfig } from '../../workflow/actions/integration/helpers/detect_write_operation'; +import { requiresApproval, getOperationType } from '../../workflow/actions/integration/helpers/detect_write_operation'; import { validateRequiredParameters } from '../../workflow/actions/integration/helpers/validate_required_parameters'; +import type { Integration, SqlIntegration, SqlOperation } from '../../model/integrations/types'; +import { getIntegrationType } from '../../model/integrations/utils/get_integration_type'; +import { isSqlIntegration } from '../../model/integrations/guards/is_sql_integration'; + +/** Single operation result validator */ +const operationResultValidator = v.object({ + id: v.optional(v.string()), + operation: v.string(), + success: v.boolean(), + data: v.optional(v.any()), + error: v.optional(v.string()), + duration: v.optional(v.number()), + rowCount: v.optional(v.number()), + requiresApproval: v.optional(v.boolean()), + approvalId: v.optional(v.string()), +}); -/** Extended type for SQL operation with query */ -interface SqlOperationConfig extends OperationConfig { - query: string; - description?: string; - parametersSchema?: { - type?: string; - properties?: Record; - required?: string[]; - }; -} +/** Batch result validator */ +const batchResultValidator = v.object({ + success: v.boolean(), + integration: v.string(), + results: v.array(operationResultValidator), + stats: v.object({ + totalTime: v.number(), + successCount: v.number(), + failureCount: v.number(), + approvalCount: v.number(), + }), +}); /** Single operation result */ interface OperationResult { @@ -48,6 +60,19 @@ interface OperationResult { approvalId?: string; } +/** Batch result type */ +interface BatchResult { + success: boolean; + integration: string; + results: OperationResult[]; + stats: { + totalTime: number; + successCount: number; + failureCount: number; + approvalCount: number; + }; +} + /** * Execute multiple integration operations in parallel * @@ -70,7 +95,8 @@ export const executeBatchIntegrationInternal = internalAction({ threadId: v.optional(v.string()), messageId: v.optional(v.string()), }, - handler: async (ctx, args) => { + returns: batchResultValidator, + handler: async (ctx, args): Promise => { const { organizationId, integrationName, operations, threadId, messageId } = args; const startTime = Date.now(); @@ -81,10 +107,10 @@ export const executeBatchIntegrationInternal = internalAction({ }); // 1. Load integration config ONCE - const integration = (await ctx.runQuery( + const integration = await ctx.runQuery( internal.integrations.getByNameInternal, { organizationId, name: integrationName }, - )) as Doc<'integrations'> | null; + ); if (!integration) { return { @@ -105,10 +131,10 @@ export const executeBatchIntegrationInternal = internalAction({ }; } - const integrationType = integration.type ?? 'rest_api'; + const integrationType = getIntegrationType(integration); // For SQL integrations, optimize by decrypting credentials once - if (integrationType === 'sql') { + if (integrationType === 'sql' && isSqlIntegration(integration)) { return executeSqlBatch(ctx, integration, operations, threadId, messageId, startTime); } @@ -123,33 +149,13 @@ export const executeBatchIntegrationInternal = internalAction({ */ async function executeSqlBatch( ctx: ActionCtx, - integration: Doc<'integrations'>, + integration: SqlIntegration, operations: Array<{ id?: string; operation: string; params?: Record }>, threadId: string | undefined, messageId: string | undefined, startTime: number, ) { - const sqlConnectionConfig = integration.sqlConnectionConfig; - const sqlOperations = (integration.sqlOperations ?? []) as SqlOperationConfig[]; - - if (!sqlConnectionConfig) { - return { - success: false, - integration: integration.name, - results: operations.map((op) => ({ - id: op.id, - operation: op.operation, - success: false, - error: `SQL integration "${integration.name}" is missing sqlConnectionConfig`, - })), - stats: { - totalTime: Date.now() - startTime, - successCount: 0, - failureCount: operations.length, - approvalCount: 0, - }, - }; - } + const { sqlConnectionConfig, sqlOperations } = integration; // 2. Decrypt credentials ONCE let credentials: { username: string; password: string }; @@ -184,7 +190,7 @@ async function executeSqlBatch( // Handle introspection operations let query: string; let queryParams: Record = params; - let operationConfig: SqlOperationConfig | undefined; + let operationConfig: SqlOperation | undefined; if (isIntrospectionOperation(op.operation)) { if (op.operation === 'introspect_tables') { @@ -351,7 +357,7 @@ async function executeSqlBatch( */ async function executeRestApiBatch( ctx: ActionCtx, - integration: Doc<'integrations'>, + integration: Integration, operations: Array<{ id?: string; operation: string; params?: Record }>, organizationId: string, threadId: string | undefined, diff --git a/services/platform/convex/agent_tools/integrations/integration_introspect_tool.ts b/services/platform/convex/agent_tools/integrations/integration_introspect_tool.ts index 1f01b7bd8..f8258b3b5 100644 --- a/services/platform/convex/agent_tools/integrations/integration_introspect_tool.ts +++ b/services/platform/convex/agent_tools/integrations/integration_introspect_tool.ts @@ -10,10 +10,10 @@ import { createTool } from '@convex-dev/agent'; import type { ToolCtx } from '@convex-dev/agent'; import type { ToolDefinition } from '../types'; import { internal } from '../../_generated/api'; -import type { Doc } from '../../_generated/dataModel'; import type { IntegrationIntrospectionResult } from './types'; import { getPredefinedIntegration } from '../../predefined_integrations'; import { getIntrospectionOperations } from '../../workflow/actions/integration/helpers/get_introspection_operations'; +import { isSqlIntegration } from '../../model/integrations/guards/is_sql_integration'; const integrationIntrospectArgs = z.object({ integrationName: z @@ -68,10 +68,10 @@ WORKFLOW: } // Fetch the specific integration - const integration = (await ctx.runQuery( + const integration = await ctx.runQuery( internal.integrations.getByNameInternal, { organizationId, name: args.integrationName }, - )) as Doc<'integrations'> | null; + ); if (!integration) { throw new Error( @@ -80,12 +80,9 @@ WORKFLOW: ); } - const integrationType = (integration as any).type || 'rest_api'; - // Handle SQL integrations - if (integrationType === 'sql') { - const sqlConfig = (integration as any).sqlConnectionConfig; - const sqlOperations = (integration as any).sqlOperations || []; + if (isSqlIntegration(integration)) { + const { sqlConnectionConfig, sqlOperations } = integration; // Get introspection operation names (always available for SQL) const introspectionOpNames = getIntrospectionOperations(); @@ -104,7 +101,7 @@ WORKFLOW: // Strip SQL queries from operations to reduce token usage. // AI only needs operation metadata (name, description, parameters) to select and call operations. // The actual SQL query is only needed at execution time by the integration tool. - const operationSummaries = sqlOperations.map((op: any) => ({ + const operationSummaries = sqlOperations.map((op) => ({ name: op.name, title: op.title, description: op.description, @@ -117,13 +114,13 @@ WORKFLOW: integrationName: integration.name, title: integration.title, description: integration.description, - engine: sqlConfig.engine, + engine: sqlConnectionConfig.engine, operations: [...introspectionOps, ...operationSummaries], } as IntegrationIntrospectionResult; } // Handle REST API integrations - let connectorConfig = (integration as any).connector; + let connectorConfig = integration.connector; if (!connectorConfig) { // Fallback to predefined integration diff --git a/services/platform/convex/integrations.ts b/services/platform/convex/integrations.ts index b711f9afa..5a35b6ad8 100644 --- a/services/platform/convex/integrations.ts +++ b/services/platform/convex/integrations.ts @@ -33,7 +33,7 @@ export const get = queryWithRLS({ args: { integrationId: v.id('integrations'), }, - returns: v.union(v.any(), v.null()), + returns: v.union(IntegrationsModel.integrationDocValidator, v.null()), handler: async (ctx, args) => { return await IntegrationsModel.getIntegration(ctx, args.integrationId); }, @@ -60,7 +60,7 @@ export const getByNameInternal = internalQuery({ organizationId: v.string(), name: v.string(), }, - returns: v.union(v.any(), v.null()), + returns: v.union(IntegrationsModel.integrationDocValidator, v.null()), handler: async (ctx, args) => { return await IntegrationsModel.getIntegrationByName(ctx, args); }, @@ -186,7 +186,7 @@ export const getIntegrationInternal = internalQuery({ args: { integrationId: v.id('integrations'), }, - returns: v.union(v.any(), v.null()), + returns: v.union(IntegrationsModel.integrationDocValidator, v.null()), handler: async (ctx, args) => { return await IntegrationsModel.getIntegration(ctx, args.integrationId); }, diff --git a/services/platform/convex/model/integrations/guards/is_rest_api_integration.ts b/services/platform/convex/model/integrations/guards/is_rest_api_integration.ts new file mode 100644 index 000000000..6a95031f8 --- /dev/null +++ b/services/platform/convex/model/integrations/guards/is_rest_api_integration.ts @@ -0,0 +1,19 @@ +import type { Integration, RestApiIntegration } from '../types'; + +/** + * Type guard to check if an integration is a REST API integration with connector. + * Narrows the type to RestApiIntegration when true. + * + * @example + * if (isRestApiIntegration(integration)) { + * // integration.connector is now typed + * const operations = integration.connector.operations; + * } + */ +export function isRestApiIntegration(integration: Integration): integration is RestApiIntegration { + return ( + // Accept undefined type for backward compatibility with legacy integrations + (integration.type === 'rest_api' || integration.type === undefined) && + integration.connector !== undefined + ); +} diff --git a/services/platform/convex/model/integrations/guards/is_sql_integration.ts b/services/platform/convex/model/integrations/guards/is_sql_integration.ts new file mode 100644 index 000000000..f78beda69 --- /dev/null +++ b/services/platform/convex/model/integrations/guards/is_sql_integration.ts @@ -0,0 +1,19 @@ +import type { Integration, SqlIntegration } from '../types'; + +/** + * Type guard to check if an integration is an SQL integration. + * Narrows the type to SqlIntegration when true. + * + * @example + * if (isSqlIntegration(integration)) { + * // integration.sqlConnectionConfig is now typed + * const engine = integration.sqlConnectionConfig.engine; + * } + */ +export function isSqlIntegration(integration: Integration): integration is SqlIntegration { + return ( + integration.type === 'sql' && + integration.sqlConnectionConfig !== undefined && + integration.sqlOperations !== undefined + ); +} diff --git a/services/platform/convex/model/integrations/types.ts b/services/platform/convex/model/integrations/types.ts index 3185f34da..677eedfd8 100644 --- a/services/platform/convex/model/integrations/types.ts +++ b/services/platform/convex/model/integrations/types.ts @@ -3,6 +3,7 @@ */ import type { Infer } from 'convex/values'; +import type { Doc } from '../../_generated/dataModel'; import { apiKeyAuthEncryptedValidator, apiKeyAuthValidator, @@ -71,3 +72,36 @@ export interface DecryptedCredentials { // ============================================================================= export const SHOPIFY_API_VERSION = '2024-01'; + +// ============================================================================= +// TYPED INTEGRATION INTERFACES +// ============================================================================= + +/** + * Base integration type alias for Doc<'integrations'>. + * Use the type guards below to narrow to specific integration types. + */ +export type Integration = Doc<'integrations'>; + +/** + * SQL Integration - integration with type='sql' and required SQL-specific fields. + * Use `isSqlIntegration()` to safely narrow to this type. + */ +export interface SqlIntegration extends Integration { + type: 'sql'; + sqlConnectionConfig: SqlConnectionConfig; + sqlOperations: SqlOperation[]; +} + +/** + * REST API Integration - integration with type='rest_api' (or undefined for legacy) and connector config. + * Use `isRestApiIntegration()` to safely narrow to this type. + * + * Note: type can be undefined for backward compatibility with legacy integrations + * that were created before the type field was added. + */ +export interface RestApiIntegration extends Integration { + type: 'rest_api' | undefined; + connector: ConnectorConfig; +} + diff --git a/services/platform/convex/model/integrations/utils/get_integration_type.ts b/services/platform/convex/model/integrations/utils/get_integration_type.ts new file mode 100644 index 000000000..6faf01459 --- /dev/null +++ b/services/platform/convex/model/integrations/utils/get_integration_type.ts @@ -0,0 +1,9 @@ +import type { Integration, IntegrationType } from '../types'; + +/** + * Gets the integration type, defaulting to 'rest_api' for backward compatibility. + * Use this instead of directly accessing integration.type to handle legacy integrations. + */ +export function getIntegrationType(integration: Integration): IntegrationType { + return integration.type ?? 'rest_api'; +} diff --git a/services/platform/convex/model/integrations/validators.ts b/services/platform/convex/model/integrations/validators.ts index bc44ab747..f43917214 100644 --- a/services/platform/convex/model/integrations/validators.ts +++ b/services/platform/convex/model/integrations/validators.ts @@ -198,3 +198,44 @@ export const testConnectionResultValidator = v.object({ success: v.boolean(), message: v.string(), }); + +/** + * Sync stats validator + */ +export const syncStatsValidator = v.object({ + totalRecords: v.optional(v.number()), + lastSyncCount: v.optional(v.number()), + failedSyncCount: v.optional(v.number()), +}); + +/** + * Integration document validator - matches the schema in schema.ts + * Used for typed query returns to avoid `as` type assertions + */ +export const integrationDocValidator = v.object({ + _id: v.id('integrations'), + _creationTime: v.number(), + organizationId: v.string(), + name: v.string(), + title: v.string(), + description: v.optional(v.string()), + type: v.optional(integrationTypeValidator), + status: statusValidator, + isActive: v.boolean(), + authMethod: authMethodValidator, + apiKeyAuth: v.optional(apiKeyAuthEncryptedValidator), + basicAuth: v.optional(basicAuthEncryptedValidator), + oauth2Auth: v.optional(oauth2AuthEncryptedValidator), + connectionConfig: v.optional(connectionConfigValidator), + lastSyncedAt: v.optional(v.number()), + lastTestedAt: v.optional(v.number()), + lastSuccessAt: v.optional(v.number()), + lastErrorAt: v.optional(v.number()), + errorMessage: v.optional(v.string()), + syncStats: v.optional(syncStatsValidator), + capabilities: v.optional(capabilitiesValidator), + connector: v.optional(connectorConfigValidator), + sqlConnectionConfig: v.optional(sqlConnectionConfigValidator), + sqlOperations: v.optional(v.array(sqlOperationValidator)), + metadata: v.optional(v.any()), +}); diff --git a/services/platform/convex/workflow/actions/integration/helpers/execute_sql_integration.ts b/services/platform/convex/workflow/actions/integration/helpers/execute_sql_integration.ts index 394804ccc..d74e82509 100644 --- a/services/platform/convex/workflow/actions/integration/helpers/execute_sql_integration.ts +++ b/services/platform/convex/workflow/actions/integration/helpers/execute_sql_integration.ts @@ -5,7 +5,6 @@ */ import type { ActionCtx } from '../../../../_generated/server'; -import type { Doc } from '../../../../_generated/dataModel'; import { internal } from '../../../../_generated/api'; import type { SqlExecutionResult } from '../../../../node_only/sql/types'; import { createDebugLog } from '../../../../lib/debug_log'; @@ -16,6 +15,7 @@ import { getIntrospectionOperations } from './get_introspection_operations'; import { decryptSqlCredentials } from './decrypt_sql_credentials'; import { requiresApproval, getOperationType } from './detect_write_operation'; import { validateRequiredParameters } from './validate_required_parameters'; +import { type SqlIntegration, type SqlOperation } from '../../../../model/integrations/types'; const debugLog = createDebugLog('DEBUG_INTEGRATIONS', '[Integrations]'); @@ -37,13 +37,13 @@ export interface ApprovalRequiredResult { */ export async function executeSqlIntegration( ctx: ActionCtx, - integration: Doc<'integrations'>, + integration: SqlIntegration, operation: string, params: Record, skipApprovalCheck: boolean = false, threadId?: string, messageId?: string, -): Promise { +): Promise { // Debug: Log context received by SQL integration executor debugLog('Received context:', { hasThreadId: threadId !== undefined, @@ -54,8 +54,7 @@ export async function executeSqlIntegration( integrationName: integration.name, }); - const sqlConnectionConfig = (integration as any).sqlConnectionConfig; - const sqlOperations = (integration as any).sqlOperations || []; + const { sqlConnectionConfig, sqlOperations } = integration; if (!sqlConnectionConfig) { throw new Error( @@ -66,7 +65,7 @@ export async function executeSqlIntegration( // Handle system introspection operations let query: string; let queryParams: Record = params; - let operationConfig: any = null; + let operationConfig: SqlOperation | undefined; if (isIntrospectionOperation(operation)) { // System introspection operations - never require approval @@ -97,11 +96,11 @@ export async function executeSqlIntegration( } else { // User-defined operation operationConfig = sqlOperations.find( - (op: any) => op.name === operation, + (op) => op.name === operation, ); if (!operationConfig) { - const userOps = sqlOperations.map((op: any) => op.name); + const userOps = sqlOperations.map((op) => op.name); const introspectionOps = getIntrospectionOperations(); const availableOps = [...userOps, ...introspectionOps].join(', '); throw new Error( @@ -127,7 +126,7 @@ export async function executeSqlIntegration( internal.agent_tools.integrations.create_integration_approval .createIntegrationApproval, { - organizationId: (integration as any).organizationId, + organizationId: integration.organizationId, integrationId: integration._id, integrationName: integration.name, integrationType: 'sql', diff --git a/services/platform/convex/workflow/actions/integration/integration_action.ts b/services/platform/convex/workflow/actions/integration/integration_action.ts index 2eb9ed0de..c447c8bc1 100644 --- a/services/platform/convex/workflow/actions/integration/integration_action.ts +++ b/services/platform/convex/workflow/actions/integration/integration_action.ts @@ -12,12 +12,12 @@ import { v } from 'convex/values'; import type { ActionDefinition } from '../../helpers/nodes/action/types'; import { internal } from '../../../_generated/api'; -import type { Doc } from '../../../_generated/dataModel'; import type { IntegrationExecutionResult } from '../../../node_only/integration_sandbox/types'; import { getPredefinedIntegration } from '../../../predefined_integrations'; import { buildSecretsFromIntegration } from './helpers/build_secrets_from_integration'; import { executeSqlIntegration } from './helpers/execute_sql_integration'; import { requiresApproval, getOperationType } from './helpers/detect_write_operation'; +import { isSqlIntegration } from '../../../model/integrations/guards/is_sql_integration'; import { createDebugLog } from '../../../lib/debug_log'; @@ -62,10 +62,10 @@ export const integrationAction: ActionDefinition<{ } // 1. Load the integration from database by name - const integration = (await ctx.runQuery!( + const integration = await ctx.runQuery!( internal.integrations.getByNameInternal, { organizationId, name }, - )) as Doc<'integrations'> | null; + ); if (!integration) { throw new Error( @@ -74,10 +74,8 @@ export const integrationAction: ActionDefinition<{ } // 2. Check integration type and route accordingly - const integrationType = (integration as any).type || 'rest_api'; // Default to rest_api for backward compatibility - // Handle SQL integrations - if (integrationType === 'sql') { + if (isSqlIntegration(integration)) { return await executeSqlIntegration(ctx, integration, operation, opParams, skipApprovalCheck, threadId, messageId); }