Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/platform/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,7 @@ export declare const components: {
maximumRowsRead?: number;
numItems: number;
};
select?: Array<string>;
sortBy?: { direction: "asc" | "desc"; field: string };
where?: Array<{
connector?: "AND" | "OR";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { saveMessage } from '@convex-dev/agent';
import { createFunctionHandle, makeFunctionReference } from 'convex/server';
import { v } from 'convex/values';

import type { Doc, Id } from '../../_generated/dataModel';
Expand All @@ -13,16 +12,15 @@ import { jsonRecordValidator } from '../../../lib/shared/schemas/utils/json-valu
import { components, internal } from '../../_generated/api';
import { internalMutation } from '../../_generated/server';
import { createApproval } from '../../approvals/helpers';
import { toSerializableConfig } from '../../custom_agents/config';
import {
createCustomAgentHookHandles,
toSerializableConfig,
} from '../../custom_agents/config';
import { getDefaultAgentRuntimeConfig } from '../../lib/agent_runtime_config';
import { checkOrganizationRateLimit } from '../../lib/rate_limiter/helpers';
import { persistentStreaming } from '../../streaming/helpers';
import { stepConfigValidator } from '../../workflow_engine/types/nodes';

const beforeGenerateHookRef = makeFunctionReference<'action'>(
'lib/agent_chat/internal_actions:beforeGenerateHook',
);

type ApprovalMetadata = Doc<'approvals'>['metadata'];

export const claimWorkflowApprovalForExecution = internalMutation({
Expand Down Expand Up @@ -92,26 +90,31 @@ export const triggerWorkflowCompletionResponse = internalMutation({
handler: async (ctx, args): Promise<void> => {
const { threadId, organizationId, messageContent } = args;

const systemChatQuery = ctx.db
.query('customAgents')
.withIndex('by_org_system_slug', (q) =>
q.eq('organizationId', organizationId).eq('systemAgentSlug', 'chat'),
);
// Resolve the agent from thread metadata
const threadMeta = await ctx.db
.query('threadMetadata')
.withIndex('by_threadId', (q) => q.eq('threadId', threadId))
.first();

let chatAgent = null;
for await (const agent of systemChatQuery) {
if (agent.status === 'active') {
chatAgent = agent;
break;
}
const customAgentId = threadMeta?.customAgentId;
if (!customAgentId) {
throw new Error(
`[triggerWorkflowCompletionResponse] Thread ${threadId} has no customAgentId`,
);
}

const chatAgent = await ctx.db
.query('customAgents')
.withIndex('by_root_status', (q) =>
q.eq('rootVersionId', customAgentId).eq('status', 'active'),
)
.filter((q) => q.eq(q.field('organizationId'), organizationId))
.first();

if (!chatAgent) {
console.warn(
'[triggerWorkflowCompletionResponse] System default chat agent not found for org:',
organizationId,
throw new Error(
`[triggerWorkflowCompletionResponse] Active agent not found for rootVersionId: ${customAgentId}`,
);
return;
}

const thread = await ctx.runQuery(components.agent.threads.getThread, {
Expand All @@ -131,19 +134,17 @@ export const triggerWorkflowCompletionResponse = internalMutation({
const { model, provider } = getDefaultAgentRuntimeConfig();
const streamId = await persistentStreaming.createStream(ctx);

// Set generationStatus so the frontend shows loading indicator
const threadMeta = await ctx.db
.query('threadMetadata')
.withIndex('by_threadId', (q) => q.eq('threadId', threadId))
.first();
if (threadMeta) {
await ctx.db.patch(threadMeta._id, {
generationStatus: 'generating' as const,
streamId,
});
}

const beforeGenerate = await createFunctionHandle(beforeGenerateHookRef);
const hooks = await createCustomAgentHookHandles(
ctx,
chatAgent.filePreprocessingEnabled,
);

await ctx.scheduler.runAfter(
0,
Expand All @@ -153,9 +154,9 @@ export const triggerWorkflowCompletionResponse = internalMutation({
agentConfig,
model: agentConfig.model ?? model,
provider,
debugTag: '[ChatAgent:WorkflowComplete]',
debugTag: `[Agent:${chatAgent.name}:WorkflowComplete]`,
enableStreaming: true,
hooks: { beforeGenerate },
hooks,
threadId,
organizationId,
promptMessage: messageContent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
maximumRowsRead?: number;
numItems: number;
};
select?: Array<string>;
sortBy?: { direction: "asc" | "desc"; field: string };
where?: Array<{
connector?: "AND" | "OR";
Expand Down
1 change: 1 addition & 0 deletions services/platform/convex/custom_agents/test_chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const testCustomAgent = mutation({
debugTag: `[CustomAgent:${agent.name}:test]`,
enableStreaming: true,
hooks,
customAgentId: args.customAgentId,
});
},
});
1 change: 1 addition & 0 deletions services/platform/convex/custom_agents/unified_chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export const chatWithAgent = mutation({
debugTag: `[Agent:${activeVersion.name}]`,
enableStreaming: true,
hooks,
customAgentId: rootVersionId,
});
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export const chatViaWebhook = internalMutation({
debugTag: `[CustomAgent:webhook:${activeVersion.name}]`,
enableStreaming: args.enableStreaming ?? true,
hooks,
customAgentId: args.customAgentId,
});

return { threadId, streamId: result.streamId };
Expand Down
4 changes: 4 additions & 0 deletions services/platform/convex/lib/agent_chat/start_agent_chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import { listMessages, saveMessage } from '@convex-dev/agent';

import type { Id } from '../../_generated/dataModel';
import type { MutationCtx } from '../../_generated/server';
import type { FileAttachment } from '../attachments';
import type { AgentType } from '../context_management/constants';
Expand Down Expand Up @@ -71,6 +72,8 @@ export interface StartAgentChatArgs {
enableStreaming: boolean;
/** Optional hooks configuration (FunctionHandles) */
hooks?: AgentHooksConfig;
/** Root version ID of the custom agent, persisted on thread metadata */
customAgentId?: Id<'customAgents'>;
}

export interface StartAgentChatResult {
Expand Down Expand Up @@ -126,6 +129,7 @@ export async function startAgentChat(
await ctx.db.patch(threadMeta._id, {
generationStatus: 'generating' as const,
streamId,
...(args.customAgentId ? { customAgentId: args.customAgentId } : {}),
});
}

Expand Down
1 change: 1 addition & 0 deletions services/platform/convex/threads/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const threadMetadataTable = defineTable({
v.union(v.literal('generating'), v.literal('idle')),
),
streamId: v.optional(v.string()),
customAgentId: v.optional(v.id('customAgents')),
})
.index('by_threadId', ['threadId'])
.index('by_userId_chatType_status', [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ describe('handleWorkflowComplete', () => {
expect(triggerCalls.length).toBe(0);
});

it('falls back to result.returnValue when no persisted output exists', async () => {
it('uses null output when no persisted output exists', async () => {
const exec = {
_id: 'exec_1',
organizationId: 'org_1',
Expand All @@ -378,7 +378,7 @@ describe('handleWorkflowComplete', () => {
const execAfterCompletion = {
...exec,
status: 'completed',
output: { simple: 'data' },
output: null,
};
const { ctx, runMutationArgs } = createMockCtx({
exec,
Expand All @@ -398,7 +398,7 @@ describe('handleWorkflowComplete', () => {
expect(completeCall).toBeDefined();
// oxlint-disable-next-line typescript/no-non-null-assertion, typescript/no-unsafe-type-assertion -- test assertion: completeCall is verified above
const completeArgs = completeCall![1] as Record<string, unknown>;
expect(completeArgs.output).toEqual({ simple: 'data' });
expect(completeArgs.output).toBeNull();
});

it('emits workflow.completed event even when completeExecution is skipped', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type { ComponentRunResult } from '../../types';

import { isRecord, getString } from '../../../../lib/utils/type-guards';
import { internal } from '../../../_generated/api';
import { toConvexJsonValue, toId } from '../../../lib/type_cast_helpers';
import { toId } from '../../../lib/type_cast_helpers';
import { emitEvent } from '../../../workflows/triggers/emit_event';

export async function handleWorkflowComplete(
Expand Down Expand Up @@ -56,8 +56,7 @@ export async function handleWorkflowComplete(
// The idempotency guard in completeExecution also protects against races.
if (!wasTerminal) {
// Use the already-persisted output from the serialize action.
// Fall back to result.returnValue for simple workflows that skip serialization.
const output = exec.output ?? toConvexJsonValue(result.returnValue);
const output = exec.output ?? null;
await ctx.runMutation(
internal.wf_executions.internal_mutations.completeExecution,
{
Expand Down Expand Up @@ -168,7 +167,7 @@ async function postCompletionMessageToThread(

const messageContent =
kind === 'success'
? `[WORKFLOW_COMPLETED]\nWorkflow "${workflowName}" completed successfully.\n\nExecution Details:\n- Execution ID: ${exec._id}\n- Status: completed${outputSummary}\n\nInstructions:\n- Inform the user that the workflow has completed successfully and present the output details`
? `[WORKFLOW_COMPLETED]\nWorkflow "${workflowName}" completed successfully.\n\nExecution Details:\n- Execution ID: ${exec._id}\n- Status: completed${outputSummary}\n\nInstructions:\n- Inform the user that the workflow has completed successfully and present the output details\n- If the output contains file download links (downloadUrl), present them to the user so they can download the files`
: `[WORKFLOW_FAILED]\nWorkflow "${workflowName}" failed.\n\nExecution Details:\n- Execution ID: ${exec._id}\n- Status: failed\n- Error: ${errorMsg || 'unknown error'}\n\nInstructions:\n- Inform the user that the workflow has failed and provide the error details`;

await ctx.scheduler.runAfter(
Expand Down
Loading
Loading