diff --git a/services/platform/convex/_generated/api.d.ts b/services/platform/convex/_generated/api.d.ts index 5b992ec40..9363a0d44 100644 --- a/services/platform/convex/_generated/api.d.ts +++ b/services/platform/convex/_generated/api.d.ts @@ -2708,6 +2708,7 @@ export declare const components: { maximumRowsRead?: number; numItems: number; }; + select?: Array; sortBy?: { direction: "asc" | "desc"; field: string }; where?: Array<{ connector?: "AND" | "OR"; diff --git a/services/platform/convex/agent_tools/workflows/internal_mutations.ts b/services/platform/convex/agent_tools/workflows/internal_mutations.ts index 7da3b08fa..b04be2c74 100644 --- a/services/platform/convex/agent_tools/workflows/internal_mutations.ts +++ b/services/platform/convex/agent_tools/workflows/internal_mutations.ts @@ -1,5 +1,4 @@ import { saveMessage } from '@convex-dev/agent'; -import { createFunctionHandle, makeFunctionReference } from 'convex/server'; import { v } from 'convex/values'; import type { Doc, Id } from '../../_generated/dataModel'; @@ -13,16 +12,15 @@ import { jsonRecordValidator } from '../../../lib/shared/schemas/utils/json-valu import { components, internal } from '../../_generated/api'; import { internalMutation } from '../../_generated/server'; import { createApproval } from '../../approvals/helpers'; -import { toSerializableConfig } from '../../custom_agents/config'; +import { + createCustomAgentHookHandles, + toSerializableConfig, +} from '../../custom_agents/config'; import { getDefaultAgentRuntimeConfig } from '../../lib/agent_runtime_config'; import { checkOrganizationRateLimit } from '../../lib/rate_limiter/helpers'; import { persistentStreaming } from '../../streaming/helpers'; import { stepConfigValidator } from '../../workflow_engine/types/nodes'; -const beforeGenerateHookRef = makeFunctionReference<'action'>( - 'lib/agent_chat/internal_actions:beforeGenerateHook', -); - type ApprovalMetadata = Doc<'approvals'>['metadata']; export const claimWorkflowApprovalForExecution = internalMutation({ @@ -92,26 +90,31 @@ export const triggerWorkflowCompletionResponse = internalMutation({ handler: async (ctx, args): Promise => { const { threadId, organizationId, messageContent } = args; - const systemChatQuery = ctx.db - .query('customAgents') - .withIndex('by_org_system_slug', (q) => - q.eq('organizationId', organizationId).eq('systemAgentSlug', 'chat'), - ); + // Resolve the agent from thread metadata + const threadMeta = await ctx.db + .query('threadMetadata') + .withIndex('by_threadId', (q) => q.eq('threadId', threadId)) + .first(); - let chatAgent = null; - for await (const agent of systemChatQuery) { - if (agent.status === 'active') { - chatAgent = agent; - break; - } + const customAgentId = threadMeta?.customAgentId; + if (!customAgentId) { + throw new Error( + `[triggerWorkflowCompletionResponse] Thread ${threadId} has no customAgentId`, + ); } + const chatAgent = await ctx.db + .query('customAgents') + .withIndex('by_root_status', (q) => + q.eq('rootVersionId', customAgentId).eq('status', 'active'), + ) + .filter((q) => q.eq(q.field('organizationId'), organizationId)) + .first(); + if (!chatAgent) { - console.warn( - '[triggerWorkflowCompletionResponse] System default chat agent not found for org:', - organizationId, + throw new Error( + `[triggerWorkflowCompletionResponse] Active agent not found for rootVersionId: ${customAgentId}`, ); - return; } const thread = await ctx.runQuery(components.agent.threads.getThread, { @@ -131,11 +134,6 @@ export const triggerWorkflowCompletionResponse = internalMutation({ const { model, provider } = getDefaultAgentRuntimeConfig(); const streamId = await persistentStreaming.createStream(ctx); - // Set generationStatus so the frontend shows loading indicator - const threadMeta = await ctx.db - .query('threadMetadata') - .withIndex('by_threadId', (q) => q.eq('threadId', threadId)) - .first(); if (threadMeta) { await ctx.db.patch(threadMeta._id, { generationStatus: 'generating' as const, @@ -143,7 +141,10 @@ export const triggerWorkflowCompletionResponse = internalMutation({ }); } - const beforeGenerate = await createFunctionHandle(beforeGenerateHookRef); + const hooks = await createCustomAgentHookHandles( + ctx, + chatAgent.filePreprocessingEnabled, + ); await ctx.scheduler.runAfter( 0, @@ -153,9 +154,9 @@ export const triggerWorkflowCompletionResponse = internalMutation({ agentConfig, model: agentConfig.model ?? model, provider, - debugTag: '[ChatAgent:WorkflowComplete]', + debugTag: `[Agent:${chatAgent.name}:WorkflowComplete]`, enableStreaming: true, - hooks: { beforeGenerate }, + hooks, threadId, organizationId, promptMessage: messageContent, diff --git a/services/platform/convex/betterAuth/_generated/component.ts b/services/platform/convex/betterAuth/_generated/component.ts index 5dfb05b48..4fe852e34 100644 --- a/services/platform/convex/betterAuth/_generated/component.ts +++ b/services/platform/convex/betterAuth/_generated/component.ts @@ -980,6 +980,7 @@ export type ComponentApi = maximumRowsRead?: number; numItems: number; }; + select?: Array; sortBy?: { direction: "asc" | "desc"; field: string }; where?: Array<{ connector?: "AND" | "OR"; diff --git a/services/platform/convex/custom_agents/test_chat.ts b/services/platform/convex/custom_agents/test_chat.ts index 8c26ecc98..a24bcf1e4 100644 --- a/services/platform/convex/custom_agents/test_chat.ts +++ b/services/platform/convex/custom_agents/test_chat.ts @@ -77,6 +77,7 @@ export const testCustomAgent = mutation({ debugTag: `[CustomAgent:${agent.name}:test]`, enableStreaming: true, hooks, + customAgentId: args.customAgentId, }); }, }); diff --git a/services/platform/convex/custom_agents/unified_chat.ts b/services/platform/convex/custom_agents/unified_chat.ts index 8773cac72..ca9216a39 100644 --- a/services/platform/convex/custom_agents/unified_chat.ts +++ b/services/platform/convex/custom_agents/unified_chat.ts @@ -115,6 +115,7 @@ export const chatWithAgent = mutation({ debugTag: `[Agent:${activeVersion.name}]`, enableStreaming: true, hooks, + customAgentId: rootVersionId, }); }, }); diff --git a/services/platform/convex/custom_agents/webhooks/internal_mutations.ts b/services/platform/convex/custom_agents/webhooks/internal_mutations.ts index e9c403357..b9df49e30 100644 --- a/services/platform/convex/custom_agents/webhooks/internal_mutations.ts +++ b/services/platform/convex/custom_agents/webhooks/internal_mutations.ts @@ -87,6 +87,7 @@ export const chatViaWebhook = internalMutation({ debugTag: `[CustomAgent:webhook:${activeVersion.name}]`, enableStreaming: args.enableStreaming ?? true, hooks, + customAgentId: args.customAgentId, }); return { threadId, streamId: result.streamId }; diff --git a/services/platform/convex/lib/agent_chat/start_agent_chat.ts b/services/platform/convex/lib/agent_chat/start_agent_chat.ts index e82ee32a6..1efdb9c88 100644 --- a/services/platform/convex/lib/agent_chat/start_agent_chat.ts +++ b/services/platform/convex/lib/agent_chat/start_agent_chat.ts @@ -13,6 +13,7 @@ import { listMessages, saveMessage } from '@convex-dev/agent'; +import type { Id } from '../../_generated/dataModel'; import type { MutationCtx } from '../../_generated/server'; import type { FileAttachment } from '../attachments'; import type { AgentType } from '../context_management/constants'; @@ -71,6 +72,8 @@ export interface StartAgentChatArgs { enableStreaming: boolean; /** Optional hooks configuration (FunctionHandles) */ hooks?: AgentHooksConfig; + /** Root version ID of the custom agent, persisted on thread metadata */ + customAgentId?: Id<'customAgents'>; } export interface StartAgentChatResult { @@ -126,6 +129,7 @@ export async function startAgentChat( await ctx.db.patch(threadMeta._id, { generationStatus: 'generating' as const, streamId, + ...(args.customAgentId ? { customAgentId: args.customAgentId } : {}), }); } diff --git a/services/platform/convex/threads/schema.ts b/services/platform/convex/threads/schema.ts index 7cf7625ee..829636cf6 100644 --- a/services/platform/convex/threads/schema.ts +++ b/services/platform/convex/threads/schema.ts @@ -14,6 +14,7 @@ export const threadMetadataTable = defineTable({ v.union(v.literal('generating'), v.literal('idle')), ), streamId: v.optional(v.string()), + customAgentId: v.optional(v.id('customAgents')), }) .index('by_threadId', ['threadId']) .index('by_userId_chatType_status', [ diff --git a/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.test.ts b/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.test.ts index 0de340cfb..deb258b0c 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.test.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.test.ts @@ -362,7 +362,7 @@ describe('handleWorkflowComplete', () => { expect(triggerCalls.length).toBe(0); }); - it('falls back to result.returnValue when no persisted output exists', async () => { + it('uses null output when no persisted output exists', async () => { const exec = { _id: 'exec_1', organizationId: 'org_1', @@ -378,7 +378,7 @@ describe('handleWorkflowComplete', () => { const execAfterCompletion = { ...exec, status: 'completed', - output: { simple: 'data' }, + output: null, }; const { ctx, runMutationArgs } = createMockCtx({ exec, @@ -398,7 +398,7 @@ describe('handleWorkflowComplete', () => { expect(completeCall).toBeDefined(); // oxlint-disable-next-line typescript/no-non-null-assertion, typescript/no-unsafe-type-assertion -- test assertion: completeCall is verified above const completeArgs = completeCall![1] as Record; - expect(completeArgs.output).toEqual({ simple: 'data' }); + expect(completeArgs.output).toBeNull(); }); it('emits workflow.completed event even when completeExecution is skipped', async () => { diff --git a/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.ts b/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.ts index 968ab5930..9d48d5543 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/on_workflow_complete.ts @@ -13,7 +13,7 @@ import type { ComponentRunResult } from '../../types'; import { isRecord, getString } from '../../../../lib/utils/type-guards'; import { internal } from '../../../_generated/api'; -import { toConvexJsonValue, toId } from '../../../lib/type_cast_helpers'; +import { toId } from '../../../lib/type_cast_helpers'; import { emitEvent } from '../../../workflows/triggers/emit_event'; export async function handleWorkflowComplete( @@ -56,8 +56,7 @@ export async function handleWorkflowComplete( // The idempotency guard in completeExecution also protects against races. if (!wasTerminal) { // Use the already-persisted output from the serialize action. - // Fall back to result.returnValue for simple workflows that skip serialization. - const output = exec.output ?? toConvexJsonValue(result.returnValue); + const output = exec.output ?? null; await ctx.runMutation( internal.wf_executions.internal_mutations.completeExecution, { @@ -168,7 +167,7 @@ async function postCompletionMessageToThread( const messageContent = kind === 'success' - ? `[WORKFLOW_COMPLETED]\nWorkflow "${workflowName}" completed successfully.\n\nExecution Details:\n- Execution ID: ${exec._id}\n- Status: completed${outputSummary}\n\nInstructions:\n- Inform the user that the workflow has completed successfully and present the output details` + ? `[WORKFLOW_COMPLETED]\nWorkflow "${workflowName}" completed successfully.\n\nExecution Details:\n- Execution ID: ${exec._id}\n- Status: completed${outputSummary}\n\nInstructions:\n- Inform the user that the workflow has completed successfully and present the output details\n- If the output contains file download links (downloadUrl), present them to the user so they can download the files` : `[WORKFLOW_FAILED]\nWorkflow "${workflowName}" failed.\n\nExecution Details:\n- Execution ID: ${exec._id}\n- Status: failed\n- Error: ${errorMsg || 'unknown error'}\n\nInstructions:\n- Inform the user that the workflow has failed and provide the error details`; await ctx.scheduler.runAfter( diff --git a/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.test.ts b/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.test.ts index 004d31ee8..74aaef175 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.test.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.test.ts @@ -1,40 +1,30 @@ import { describe, it, expect, vi } from 'vitest'; /** - * Tests for the output extraction and sanitization logic in + * Tests for the output extraction logic in * serialize_and_complete_execution_handler.ts. * - * We test the pure logic (sanitizeOutputVariables and __workflowOutput extraction) - * and the storage reference resolution path. + * __workflowOutput is stored under the nested variables namespace + * (vars.variables.__workflowOutput) by persistExecutionResult. + * + * When no __workflowOutput exists, output is null (not sanitized variables). + * Variables are stored separately — the output field is reserved for + * explicit output node data only. */ -const SENSITIVE_OUTPUT_KEYS = [ - 'secrets', - 'organizationId', - 'wfDefinitionId', - 'rootWfDefinitionId', -]; - -function sanitizeOutputVariables(vars: unknown): unknown { - if (typeof vars !== 'object' || vars === null || Array.isArray(vars)) { - return vars; - } - const sanitized = { ...vars } as Record; - for (const key of SENSITIVE_OUTPUT_KEYS) { - delete sanitized[key]; - } - return sanitized; +function isRecord(val: unknown): val is Record { + return typeof val === 'object' && val !== null && !Array.isArray(val); } /** - * Mirrors the updated extraction logic that resolves _storageRef via + * Mirrors the extraction logic that resolves _storageRef via * deserializeVariablesInAction before extracting __workflowOutput. */ async function extractOutput( variablesJson: string | undefined, storageGet?: (id: string) => Promise, ): Promise { - if (!variablesJson) return {}; + if (!variablesJson) return null; try { const parsed = JSON.parse(variablesJson); @@ -47,71 +37,87 @@ async function extractOutput( } else if (typeof parsed === 'object' && parsed !== null) { vars = parsed; } else { - return {}; + return null; } - if ('__workflowOutput' in vars) { - return vars.__workflowOutput; + // __workflowOutput is stored under the variables namespace by persistExecutionResult + const nestedVars = isRecord(vars.variables) ? vars.variables : vars; + if ('__workflowOutput' in nestedVars) { + return nestedVars.__workflowOutput; } - return sanitizeOutputVariables(vars); + return null; } catch { - return {}; + return null; } } describe('extractOutput', () => { - it('returns __workflowOutput when present', async () => { + it('returns __workflowOutput from nested variables namespace', async () => { const vars = JSON.stringify({ - __workflowOutput: { analysis: 'good', score: 42 }, - secrets: { apiKey: 'secret123' }, - someStep: 'data', + input: {}, + config: {}, + variables: { + __workflowOutput: { analysis: 'good', score: 42 }, + someVar: 'data', + }, }); const result = await extractOutput(vars); expect(result).toEqual({ analysis: 'good', score: 42 }); }); it('returns null __workflowOutput when explicitly null', async () => { - const vars = JSON.stringify({ __workflowOutput: null }); + const vars = JSON.stringify({ + variables: { __workflowOutput: null }, + }); expect(await extractOutput(vars)).toBeNull(); }); it('returns array __workflowOutput', async () => { - const vars = JSON.stringify({ __workflowOutput: [1, 2, 3] }); + const vars = JSON.stringify({ + variables: { __workflowOutput: [1, 2, 3] }, + }); expect(await extractOutput(vars)).toEqual([1, 2, 3]); }); - it('falls back to sanitized variables when no __workflowOutput', async () => { + it('falls back to top-level __workflowOutput when no nested variables', async () => { const vars = JSON.stringify({ - customerId: 'cust_1', - analysis: 'done', - secrets: { apiKey: 'secret123' }, - organizationId: 'org_1', - wfDefinitionId: 'wf_1', - rootWfDefinitionId: 'root_1', + __workflowOutput: { greeting: 'hello' }, }); const result = await extractOutput(vars); - expect(result).toEqual({ - customerId: 'cust_1', - analysis: 'done', + expect(result).toEqual({ greeting: 'hello' }); + }); + + it('returns null when no __workflowOutput anywhere', async () => { + const vars = JSON.stringify({ + input: {}, + config: {}, + variables: { + customerId: 'cust_1', + analysis: 'done', + }, }); + const result = await extractOutput(vars); + expect(result).toBeNull(); }); - it('returns empty object for undefined variables', async () => { - expect(await extractOutput(undefined)).toEqual({}); + it('returns null for undefined variables', async () => { + expect(await extractOutput(undefined)).toBeNull(); }); - it('returns empty object for invalid JSON', async () => { - expect(await extractOutput('not json')).toEqual({}); + it('returns null for invalid JSON', async () => { + expect(await extractOutput('not json')).toBeNull(); }); it('resolves _storageRef and extracts __workflowOutput from blob storage', async () => { const realVariables = { - __workflowOutput: { - fileStorageId: 'kg278fznabcg53cpt8jjqzm3n982gq94', - downloadUrl: 'https://example.com/report.docx', - fileName: 'contract-comparison-report.docx', + variables: { + __workflowOutput: { + fileStorageId: 'kg278fznabcg53cpt8jjqzm3n982gq94', + downloadUrl: 'https://example.com/report.docx', + fileName: 'contract-comparison-report.docx', + }, }, - chunkResults: ['chunk1', 'chunk2'], + input: {}, }; const storageGet = vi.fn().mockResolvedValue( @@ -134,11 +140,12 @@ describe('extractOutput', () => { }); }); - it('resolves _storageRef and falls back to sanitized vars when no __workflowOutput', async () => { + it('resolves _storageRef and returns null when no __workflowOutput', async () => { const realVariables = { - customerId: 'cust_1', - analysis: 'done', - secrets: { apiKey: 'secret123' }, + variables: { + customerId: 'cust_1', + analysis: 'done', + }, }; const storageGet = vi.fn().mockResolvedValue( @@ -153,13 +160,10 @@ describe('extractOutput', () => { const result = await extractOutput(variablesJson, storageGet); - expect(result).toEqual({ - customerId: 'cust_1', - analysis: 'done', - }); + expect(result).toBeNull(); }); - it('returns empty object when _storageRef blob is not found', async () => { + it('returns null when _storageRef blob is not found', async () => { const storageGet = vi.fn().mockResolvedValue(null); const variablesJson = JSON.stringify({ @@ -167,45 +171,6 @@ describe('extractOutput', () => { }); const result = await extractOutput(variablesJson, storageGet); - expect(result).toEqual({}); - }); -}); - -describe('sanitizeOutputVariables', () => { - it('strips all sensitive keys', () => { - const result = sanitizeOutputVariables({ - data: 'ok', - secrets: { key: 'value' }, - organizationId: 'org_1', - wfDefinitionId: 'wf_1', - rootWfDefinitionId: 'root_1', - }); - expect(result).toEqual({ data: 'ok' }); - }); - - it('returns arrays as-is', () => { - expect(sanitizeOutputVariables([1, 2, 3])).toEqual([1, 2, 3]); - }); - - it('returns null as-is', () => { - expect(sanitizeOutputVariables(null)).toBeNull(); - }); - - it('returns primitives as-is', () => { - expect(sanitizeOutputVariables('hello')).toBe('hello'); - expect(sanitizeOutputVariables(42)).toBe(42); - }); - - it('preserves non-sensitive keys', () => { - const result = sanitizeOutputVariables({ - customerId: 'cust_1', - status: 'active', - items: [1, 2], - }); - expect(result).toEqual({ - customerId: 'cust_1', - status: 'active', - items: [1, 2], - }); + expect(result).toBeNull(); }); }); diff --git a/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.ts b/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.ts index 70e13ef15..cf127387d 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/serialize_and_complete_execution_handler.ts @@ -12,6 +12,7 @@ import type { Id } from '../../../_generated/dataModel'; import type { ActionCtx } from '../../../_generated/server'; +import { isRecord } from '../../../../lib/utils/type-guards'; import { internal } from '../../../_generated/api'; import { createDebugLog } from '../../../lib/debug_log'; import { deserializeVariablesInAction } from '../serialization/deserialize_variables'; @@ -51,12 +52,12 @@ export async function handleSerializeExecutionOutput( } } - // Extract output: use output node's __workflowOutput if present, otherwise - // fall back to sanitized variables (strips secrets and system fields) + // Extract output: use output node's __workflowOutput if present. + // __workflowOutput is stored under the variables namespace by persistExecutionResult. + // When no output node exists, output is null — variables are stored separately. + const nestedVars = isRecord(vars.variables) ? vars.variables : vars; const output: unknown = - '__workflowOutput' in vars - ? vars.__workflowOutput - : sanitizeOutputVariables(vars); + '__workflowOutput' in nestedVars ? nestedVars.__workflowOutput : null; // Serialize output to storage if needed const { serialized: outputSerialized, storageId: outputStorageId } = @@ -107,22 +108,17 @@ const SENSITIVE_KEYS = [ 'rootWfDefinitionId', ]; -function sanitizeOutputVariables(vars: unknown): unknown { - if (typeof vars !== 'object' || vars === null || Array.isArray(vars)) { - return vars; - } - const sanitized = { ...vars } as Record; - for (const key of SENSITIVE_KEYS) { - delete sanitized[key]; - } - return sanitized; -} - function sanitizeVariablesForStorage( vars: Record, ): Record { const cleaned = { ...vars }; delete cleaned.__workflowOutput; + // Also clean __workflowOutput from nested variables namespace + if (isRecord(cleaned.variables)) { + const nestedCleaned = { ...cleaned.variables }; + delete nestedCleaned.__workflowOutput; + cleaned.variables = nestedCleaned; + } for (const key of SENSITIVE_KEYS) { delete cleaned[key]; }