Skip to content
6 changes: 6 additions & 0 deletions services/platform/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ import type * as model_integrations_get_decrypted_credentials from "../model/int
import type * as model_integrations_get_integration from "../model/integrations/get_integration.js";
import type * as model_integrations_get_integration_by_name from "../model/integrations/get_integration_by_name.js";
import type * as model_integrations_get_workflows_for_integration from "../model/integrations/get_workflows_for_integration.js";
import type * as model_integrations_guards_is_rest_api_integration from "../model/integrations/guards/is_rest_api_integration.js";
import type * as model_integrations_guards_is_sql_integration from "../model/integrations/guards/is_sql_integration.js";
import type * as model_integrations_index from "../model/integrations/index.js";
import type * as model_integrations_list_integrations from "../model/integrations/list_integrations.js";
import type * as model_integrations_run_health_check from "../model/integrations/run_health_check.js";
Expand All @@ -306,6 +308,7 @@ import type * as model_integrations_types from "../model/integrations/types.js";
import type * as model_integrations_update_integration_internal from "../model/integrations/update_integration_internal.js";
import type * as model_integrations_update_integration_logic from "../model/integrations/update_integration_logic.js";
import type * as model_integrations_update_sync_stats from "../model/integrations/update_sync_stats.js";
import type * as model_integrations_utils_get_integration_type from "../model/integrations/utils/get_integration_type.js";
import type * as model_integrations_validators from "../model/integrations/validators.js";
import type * as model_members_index from "../model/members/index.js";
import type * as model_members_validators from "../model/members/validators.js";
Expand Down Expand Up @@ -1014,6 +1017,8 @@ declare const fullApi: ApiFromModules<{
"model/integrations/get_integration": typeof model_integrations_get_integration;
"model/integrations/get_integration_by_name": typeof model_integrations_get_integration_by_name;
"model/integrations/get_workflows_for_integration": typeof model_integrations_get_workflows_for_integration;
"model/integrations/guards/is_rest_api_integration": typeof model_integrations_guards_is_rest_api_integration;
"model/integrations/guards/is_sql_integration": typeof model_integrations_guards_is_sql_integration;
"model/integrations/index": typeof model_integrations_index;
"model/integrations/list_integrations": typeof model_integrations_list_integrations;
"model/integrations/run_health_check": typeof model_integrations_run_health_check;
Expand All @@ -1025,6 +1030,7 @@ declare const fullApi: ApiFromModules<{
"model/integrations/update_integration_internal": typeof model_integrations_update_integration_internal;
"model/integrations/update_integration_logic": typeof model_integrations_update_integration_logic;
"model/integrations/update_sync_stats": typeof model_integrations_update_sync_stats;
"model/integrations/utils/get_integration_type": typeof model_integrations_utils_get_integration_type;
"model/integrations/validators": typeof model_integrations_validators;
"model/members/index": typeof model_members_index;
"model/members/validators": typeof model_members_validators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,44 @@

import { internalAction, type ActionCtx } from '../../_generated/server';
import { v } from 'convex/values';
import type { Doc } from '../../_generated/dataModel';
import { internal } from '../../_generated/api';
import type { SqlExecutionResult } from '../../node_only/sql/types';
import { isIntrospectionOperation } from '../../workflow/actions/integration/helpers/is_introspection_operation';
import { getIntrospectTablesQuery } from '../../workflow/actions/integration/helpers/get_introspect_tables_query';
import { getIntrospectColumnsQuery } from '../../workflow/actions/integration/helpers/get_introspect_columns_query';
import { getIntrospectionOperations } from '../../workflow/actions/integration/helpers/get_introspection_operations';
import { decryptSqlCredentials } from '../../workflow/actions/integration/helpers/decrypt_sql_credentials';
import { requiresApproval, getOperationType, type OperationConfig } from '../../workflow/actions/integration/helpers/detect_write_operation';
import { requiresApproval, getOperationType } from '../../workflow/actions/integration/helpers/detect_write_operation';
import { validateRequiredParameters } from '../../workflow/actions/integration/helpers/validate_required_parameters';
import type { Integration, SqlIntegration, SqlOperation } from '../../model/integrations/types';
import { getIntegrationType } from '../../model/integrations/utils/get_integration_type';
import { isSqlIntegration } from '../../model/integrations/guards/is_sql_integration';

/** Single operation result validator */
const operationResultValidator = v.object({
id: v.optional(v.string()),
operation: v.string(),
success: v.boolean(),
data: v.optional(v.any()),
error: v.optional(v.string()),
duration: v.optional(v.number()),
rowCount: v.optional(v.number()),
requiresApproval: v.optional(v.boolean()),
approvalId: v.optional(v.string()),
});

/** Extended type for SQL operation with query */
interface SqlOperationConfig extends OperationConfig {
query: string;
description?: string;
parametersSchema?: {
type?: string;
properties?: Record<string, {
type?: string;
description?: string;
required?: boolean;
default?: unknown;
}>;
required?: string[];
};
}
/** Batch result validator */
const batchResultValidator = v.object({
success: v.boolean(),
integration: v.string(),
results: v.array(operationResultValidator),
stats: v.object({
totalTime: v.number(),
successCount: v.number(),
failureCount: v.number(),
approvalCount: v.number(),
}),
});

/** Single operation result */
interface OperationResult {
Expand All @@ -48,6 +60,19 @@ interface OperationResult {
approvalId?: string;
}

/** Batch result type */
interface BatchResult {
success: boolean;
integration: string;
results: OperationResult[];
stats: {
totalTime: number;
successCount: number;
failureCount: number;
approvalCount: number;
};
}

/**
* Execute multiple integration operations in parallel
*
Expand All @@ -70,7 +95,8 @@ export const executeBatchIntegrationInternal = internalAction({
threadId: v.optional(v.string()),
messageId: v.optional(v.string()),
},
handler: async (ctx, args) => {
returns: batchResultValidator,
handler: async (ctx, args): Promise<BatchResult> => {
const { organizationId, integrationName, operations, threadId, messageId } = args;
const startTime = Date.now();

Expand All @@ -81,10 +107,10 @@ export const executeBatchIntegrationInternal = internalAction({
});

// 1. Load integration config ONCE
const integration = (await ctx.runQuery(
const integration = await ctx.runQuery(
internal.integrations.getByNameInternal,
{ organizationId, name: integrationName },
)) as Doc<'integrations'> | null;
);

if (!integration) {
return {
Expand All @@ -105,10 +131,10 @@ export const executeBatchIntegrationInternal = internalAction({
};
}

const integrationType = integration.type ?? 'rest_api';
const integrationType = getIntegrationType(integration);

// For SQL integrations, optimize by decrypting credentials once
if (integrationType === 'sql') {
if (integrationType === 'sql' && isSqlIntegration(integration)) {
return executeSqlBatch(ctx, integration, operations, threadId, messageId, startTime);
}

Expand All @@ -123,33 +149,13 @@ export const executeBatchIntegrationInternal = internalAction({
*/
async function executeSqlBatch(
ctx: ActionCtx,
integration: Doc<'integrations'>,
integration: SqlIntegration,
operations: Array<{ id?: string; operation: string; params?: Record<string, unknown> }>,
threadId: string | undefined,
messageId: string | undefined,
startTime: number,
) {
const sqlConnectionConfig = integration.sqlConnectionConfig;
const sqlOperations = (integration.sqlOperations ?? []) as SqlOperationConfig[];

if (!sqlConnectionConfig) {
return {
success: false,
integration: integration.name,
results: operations.map((op) => ({
id: op.id,
operation: op.operation,
success: false,
error: `SQL integration "${integration.name}" is missing sqlConnectionConfig`,
})),
stats: {
totalTime: Date.now() - startTime,
successCount: 0,
failureCount: operations.length,
approvalCount: 0,
},
};
}
const { sqlConnectionConfig, sqlOperations } = integration;

// 2. Decrypt credentials ONCE
let credentials: { username: string; password: string };
Expand Down Expand Up @@ -184,7 +190,7 @@ async function executeSqlBatch(
// Handle introspection operations
let query: string;
let queryParams: Record<string, unknown> = params;
let operationConfig: SqlOperationConfig | undefined;
let operationConfig: SqlOperation | undefined;

if (isIntrospectionOperation(op.operation)) {
if (op.operation === 'introspect_tables') {
Expand Down Expand Up @@ -351,7 +357,7 @@ async function executeSqlBatch(
*/
async function executeRestApiBatch(
ctx: ActionCtx,
integration: Doc<'integrations'>,
integration: Integration,
operations: Array<{ id?: string; operation: string; params?: Record<string, unknown> }>,
organizationId: string,
threadId: string | undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { createTool } from '@convex-dev/agent';
import type { ToolCtx } from '@convex-dev/agent';
import type { ToolDefinition } from '../types';
import { internal } from '../../_generated/api';
import type { Doc } from '../../_generated/dataModel';
import type { IntegrationIntrospectionResult } from './types';
import { getPredefinedIntegration } from '../../predefined_integrations';
import { getIntrospectionOperations } from '../../workflow/actions/integration/helpers/get_introspection_operations';
import { isSqlIntegration } from '../../model/integrations/guards/is_sql_integration';

const integrationIntrospectArgs = z.object({
integrationName: z
Expand Down Expand Up @@ -68,10 +68,10 @@ WORKFLOW:
}

// Fetch the specific integration
const integration = (await ctx.runQuery(
const integration = await ctx.runQuery(
internal.integrations.getByNameInternal,
{ organizationId, name: args.integrationName },
)) as Doc<'integrations'> | null;
);

if (!integration) {
throw new Error(
Expand All @@ -80,12 +80,9 @@ WORKFLOW:
);
}

const integrationType = (integration as any).type || 'rest_api';

// Handle SQL integrations
if (integrationType === 'sql') {
const sqlConfig = (integration as any).sqlConnectionConfig;
const sqlOperations = (integration as any).sqlOperations || [];
if (isSqlIntegration(integration)) {
const { sqlConnectionConfig, sqlOperations } = integration;

// Get introspection operation names (always available for SQL)
const introspectionOpNames = getIntrospectionOperations();
Expand All @@ -104,7 +101,7 @@ WORKFLOW:
// Strip SQL queries from operations to reduce token usage.
// AI only needs operation metadata (name, description, parameters) to select and call operations.
// The actual SQL query is only needed at execution time by the integration tool.
const operationSummaries = sqlOperations.map((op: any) => ({
const operationSummaries = sqlOperations.map((op) => ({
name: op.name,
title: op.title,
description: op.description,
Expand All @@ -117,13 +114,13 @@ WORKFLOW:
integrationName: integration.name,
title: integration.title,
description: integration.description,
engine: sqlConfig.engine,
engine: sqlConnectionConfig.engine,
operations: [...introspectionOps, ...operationSummaries],
} as IntegrationIntrospectionResult;
}

// Handle REST API integrations
let connectorConfig = (integration as any).connector;
let connectorConfig = integration.connector;

if (!connectorConfig) {
// Fallback to predefined integration
Expand Down
6 changes: 3 additions & 3 deletions services/platform/convex/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const get = queryWithRLS({
args: {
integrationId: v.id('integrations'),
},
returns: v.union(v.any(), v.null()),
returns: v.union(IntegrationsModel.integrationDocValidator, v.null()),
handler: async (ctx, args) => {
return await IntegrationsModel.getIntegration(ctx, args.integrationId);
},
Expand All @@ -60,7 +60,7 @@ export const getByNameInternal = internalQuery({
organizationId: v.string(),
name: v.string(),
},
returns: v.union(v.any(), v.null()),
returns: v.union(IntegrationsModel.integrationDocValidator, v.null()),
handler: async (ctx, args) => {
return await IntegrationsModel.getIntegrationByName(ctx, args);
},
Expand Down Expand Up @@ -186,7 +186,7 @@ export const getIntegrationInternal = internalQuery({
args: {
integrationId: v.id('integrations'),
},
returns: v.union(v.any(), v.null()),
returns: v.union(IntegrationsModel.integrationDocValidator, v.null()),
handler: async (ctx, args) => {
return await IntegrationsModel.getIntegration(ctx, args.integrationId);
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { Integration, RestApiIntegration } from '../types';

/**
* Type guard to check if an integration is a REST API integration with connector.
* Narrows the type to RestApiIntegration when true.
*
* @example
* if (isRestApiIntegration(integration)) {
* // integration.connector is now typed
* const operations = integration.connector.operations;
* }
*/
export function isRestApiIntegration(integration: Integration): integration is RestApiIntegration {
return (
// Accept undefined type for backward compatibility with legacy integrations
(integration.type === 'rest_api' || integration.type === undefined) &&
integration.connector !== undefined
);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { Integration, SqlIntegration } from '../types';

/**
* Type guard to check if an integration is an SQL integration.
* Narrows the type to SqlIntegration when true.
*
* @example
* if (isSqlIntegration(integration)) {
* // integration.sqlConnectionConfig is now typed
* const engine = integration.sqlConnectionConfig.engine;
* }
*/
export function isSqlIntegration(integration: Integration): integration is SqlIntegration {
return (
integration.type === 'sql' &&
integration.sqlConnectionConfig !== undefined &&
integration.sqlOperations !== undefined
);
}
34 changes: 34 additions & 0 deletions services/platform/convex/model/integrations/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

import type { Infer } from 'convex/values';
import type { Doc } from '../../_generated/dataModel';
import {
apiKeyAuthEncryptedValidator,
apiKeyAuthValidator,
Expand Down Expand Up @@ -71,3 +72,36 @@ export interface DecryptedCredentials {
// =============================================================================

export const SHOPIFY_API_VERSION = '2024-01';

// =============================================================================
// TYPED INTEGRATION INTERFACES
// =============================================================================

/**
* Base integration type alias for Doc<'integrations'>.
* Use the type guards below to narrow to specific integration types.
*/
export type Integration = Doc<'integrations'>;

/**
* SQL Integration - integration with type='sql' and required SQL-specific fields.
* Use `isSqlIntegration()` to safely narrow to this type.
*/
export interface SqlIntegration extends Integration {
type: 'sql';
sqlConnectionConfig: SqlConnectionConfig;
sqlOperations: SqlOperation[];
}

/**
* REST API Integration - integration with type='rest_api' (or undefined for legacy) and connector config.
* Use `isRestApiIntegration()` to safely narrow to this type.
*
* Note: type can be undefined for backward compatibility with legacy integrations
* that were created before the type field was added.
*/
export interface RestApiIntegration extends Integration {
type: 'rest_api' | undefined;
connector: ConnectorConfig;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type { Integration, IntegrationType } from '../types';

/**
* Gets the integration type, defaulting to 'rest_api' for backward compatibility.
* Use this instead of directly accessing integration.type to handle legacy integrations.
*/
export function getIntegrationType(integration: Integration): IntegrationType {
return integration.type ?? 'rest_api';
}
Loading