Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions examples/integrations/outlook/connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ function mapGraphToEmailType(msg: GraphMessage, accountEmail: string) {
text: contentType === 'text' ? bodyContent : '',
html: contentType === 'html' ? bodyContent : '',
flags: msg.isRead ? ['\\Seen'] : [],
headers: {},
headers: {
'message-id': msg.internetMessageId || '',
'in-reply-to': '',
references: '',
},
attachments: (msg.attachments || [])
.filter(function (att) {
return att['@odata.type'] === '#microsoft.graph.fileAttachment';
Expand Down Expand Up @@ -308,7 +312,13 @@ function listMessages(
) {
const top = Math.min((params.top as number) || 25, 100);
const orderby = (params.orderby as string) || 'receivedDateTime desc';
const filter = (params.filter as string) || '';

// Build filter: prefer explicit cursor param, fall back to raw filter string
const cursor = params.cursor as { receivedDateTime?: string } | undefined;
let filter = (params.filter as string) || '';
if (cursor?.receivedDateTime && !filter) {
filter = 'receivedDateTime gt ' + cursor.receivedDateTime;
}

// Graph API does not support $orderby combined with $filter on conversationId.
// When this combination is detected, drop $orderby from the request and sort
Expand Down Expand Up @@ -341,7 +351,7 @@ function listMessages(
queryParts.push('$select=' + params.select);
} else {
queryParts.push(
'$select=id,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview',
'$select=id,internetMessageId,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview',
);
}
if (params.skip) {
Expand Down Expand Up @@ -377,11 +387,21 @@ function listMessages(
});
}

// Compute nextCursor from the last raw message for sync cursor advancement
const lastRaw = rawValues[rawValues.length - 1];
const nextCursor = lastRaw
? {
receivedDateTime: lastRaw.receivedDateTime || '',
messageId: lastRaw.id || '',
}
: null;

return {
success: true,
operation: 'list_messages',
data: messages,
count: messages.length,
nextCursor: nextCursor,
pagination: {
hasNextPage: !!(data as Record<string, unknown>)['@odata.nextLink'],
nextLink: (data as Record<string, unknown>)['@odata.nextLink'] || null,
Expand Down Expand Up @@ -437,7 +457,7 @@ function searchMessages(
const queryParts = [
'$top=' + top,
'$search="' + encodeURIComponent(params.query as string) + '"',
'$select=id,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview',
'$select=id,internetMessageId,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview',
];

const url = GRAPH_BASE_URL + '/me/messages?' + queryParts.join('&');
Expand Down
35 changes: 24 additions & 11 deletions examples/workflows/outlook/email-sync.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"workflowConfig": {
"name": "Email Sync (Outlook)",
"description": "Sync emails from Outlook via Microsoft Graph API into conversations with automatic thread grouping",
"version": "1.0.0",
"version": "1.1.0",
"workflowType": "predefined",
"config": {
"timeout": 300000,
Expand All @@ -23,20 +23,17 @@
"stepType": "start",
"config": {},
"nextSteps": {
"success": "query_latest_inbound_message"
"success": "read_sync_cursor"
}
},
{
"stepSlug": "query_latest_inbound_message",
"name": "Query Latest Inbound Message",
"stepSlug": "read_sync_cursor",
"name": "Read Sync Cursor",
"stepType": "action",
"config": {
"type": "conversation",
"parameters": {
"operation": "query_latest_message_by_delivery_state",
"channel": "email",
"direction": "inbound",
"deliveryState": "delivered",
"operation": "get_email_sync_cursor",
"integrationName": "outlook"
}
},
Expand All @@ -49,8 +46,8 @@
"name": "Check Has Cursor",
"stepType": "condition",
"config": {
"expression": "steps.query_latest_inbound_message.output.data.message != null",
"description": "Check if we have a previously synced message to use as cursor"
"expression": "steps.read_sync_cursor.output.data.cursor != null",
"description": "Check if we have a previously stored sync cursor"
},
"nextSteps": {
"true": "fetch_new_emails",
Expand Down Expand Up @@ -88,7 +85,7 @@
"name": "outlook",
"operation": "list_messages",
"params": {
"filter": "receivedDateTime gt {{steps.query_latest_inbound_message.output.data.message.deliveredAt|isoDate}}",
"cursor": "{{steps.read_sync_cursor.output.data.cursor}}",
"orderby": "receivedDateTime asc",
"top": 1,
"select": "id,subject,from,toRecipients,ccRecipients,bccRecipients,receivedDateTime,sentDateTime,body,internetMessageId,conversationId,isRead,hasAttachments,bodyPreview",
Expand Down Expand Up @@ -167,6 +164,22 @@
"integrationName": "outlook"
}
},
"nextSteps": {
"success": "update_sync_cursor"
}
},
{
"stepSlug": "update_sync_cursor",
"name": "Update Sync Cursor",
"stepType": "action",
"config": {
"type": "conversation",
"parameters": {
"operation": "update_email_sync_cursor",
"integrationName": "outlook",
"cursor": "{{steps.fetch_new_emails.output.data.result.nextCursor || steps.fetch_latest_email.output.data.result.nextCursor}}"
}
},
"nextSteps": {
"success": "noop"
}
Expand Down
4 changes: 4 additions & 0 deletions services/platform/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,9 +603,11 @@ import type * as workflow_engine_action_defs_conversation_helpers_build_email_me
import type * as workflow_engine_action_defs_conversation_helpers_build_initial_message from "../workflow_engine/action_defs/conversation/helpers/build_initial_message.js";
import type * as workflow_engine_action_defs_conversation_helpers_check_conversation_exists from "../workflow_engine/action_defs/conversation/helpers/check_conversation_exists.js";
import type * as workflow_engine_action_defs_conversation_helpers_check_message_exists from "../workflow_engine/action_defs/conversation/helpers/check_message_exists.js";
import type * as workflow_engine_action_defs_conversation_helpers_constants from "../workflow_engine/action_defs/conversation/helpers/constants.js";
import type * as workflow_engine_action_defs_conversation_helpers_create_conversation from "../workflow_engine/action_defs/conversation/helpers/create_conversation.js";
import type * as workflow_engine_action_defs_conversation_helpers_create_conversation_from_email from "../workflow_engine/action_defs/conversation/helpers/create_conversation_from_email.js";
import type * as workflow_engine_action_defs_conversation_helpers_create_conversation_from_sent_email from "../workflow_engine/action_defs/conversation/helpers/create_conversation_from_sent_email.js";
import type * as workflow_engine_action_defs_conversation_helpers_email_sync_cursor from "../workflow_engine/action_defs/conversation/helpers/email_sync_cursor.js";
import type * as workflow_engine_action_defs_conversation_helpers_find_or_create_customer_from_email from "../workflow_engine/action_defs/conversation/helpers/find_or_create_customer_from_email.js";
import type * as workflow_engine_action_defs_conversation_helpers_find_related_conversation from "../workflow_engine/action_defs/conversation/helpers/find_related_conversation.js";
import type * as workflow_engine_action_defs_conversation_helpers_normalize_email from "../workflow_engine/action_defs/conversation/helpers/normalize_email.js";
Expand Down Expand Up @@ -1458,9 +1460,11 @@ declare const fullApi: ApiFromModules<{
"workflow_engine/action_defs/conversation/helpers/build_initial_message": typeof workflow_engine_action_defs_conversation_helpers_build_initial_message;
"workflow_engine/action_defs/conversation/helpers/check_conversation_exists": typeof workflow_engine_action_defs_conversation_helpers_check_conversation_exists;
"workflow_engine/action_defs/conversation/helpers/check_message_exists": typeof workflow_engine_action_defs_conversation_helpers_check_message_exists;
"workflow_engine/action_defs/conversation/helpers/constants": typeof workflow_engine_action_defs_conversation_helpers_constants;
"workflow_engine/action_defs/conversation/helpers/create_conversation": typeof workflow_engine_action_defs_conversation_helpers_create_conversation;
"workflow_engine/action_defs/conversation/helpers/create_conversation_from_email": typeof workflow_engine_action_defs_conversation_helpers_create_conversation_from_email;
"workflow_engine/action_defs/conversation/helpers/create_conversation_from_sent_email": typeof workflow_engine_action_defs_conversation_helpers_create_conversation_from_sent_email;
"workflow_engine/action_defs/conversation/helpers/email_sync_cursor": typeof workflow_engine_action_defs_conversation_helpers_email_sync_cursor;
"workflow_engine/action_defs/conversation/helpers/find_or_create_customer_from_email": typeof workflow_engine_action_defs_conversation_helpers_find_or_create_customer_from_email;
"workflow_engine/action_defs/conversation/helpers/find_related_conversation": typeof workflow_engine_action_defs_conversation_helpers_find_related_conversation;
"workflow_engine/action_defs/conversation/helpers/normalize_email": typeof workflow_engine_action_defs_conversation_helpers_normalize_email;
Expand Down
23 changes: 23 additions & 0 deletions services/platform/convex/integrations/internal_mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,29 @@ export const createIntegration = internalMutation({
},
});

export const patchIntegrationMetadata = internalMutation({
args: {
integrationId: v.id('integrations'),
metadataPatch: jsonRecordValidator,
},
handler: async (ctx, args) => {
const integration = await ctx.db.get(args.integrationId);
if (!integration) {
throw new Error('Integration not found');
}

const currentMetadata =
integration.metadata &&
typeof integration.metadata === 'object' &&
!Array.isArray(integration.metadata)
? integration.metadata
: {};
const merged = { ...currentMetadata, ...args.metadataPatch };

await ctx.db.patch(args.integrationId, { metadata: merged });
Comment on lines +95 to +103
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't silently replace non-object metadata.

createIntegration still accepts arbitrary JSON metadata, so an existing scalar or array value is legal here. Falling back to {} means the first cursor update erases that stored value and replaces it with an object. Fail fast here, or narrow integrations metadata to records before merging.

Proposed guard
-    const currentMetadata =
-      integration.metadata &&
-      typeof integration.metadata === 'object' &&
-      !Array.isArray(integration.metadata)
-        ? integration.metadata
-        : {};
+    if (
+      integration.metadata !== undefined &&
+      (typeof integration.metadata !== 'object' ||
+        integration.metadata === null ||
+        Array.isArray(integration.metadata))
+    ) {
+      throw new Error('Integration metadata must be an object to patch');
+    }
+
+    const currentMetadata = integration.metadata ?? {};
     const merged = { ...currentMetadata, ...args.metadataPatch };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/platform/convex/integrations/internal_mutations.ts` around lines 95
- 103, The code currently coerces non-object integration.metadata to {} which
silently erases valid scalar/array metadata; update the patch logic in the
integration update flow (around currentMetadata, merged, ctx.db.patch,
args.integrationId, args.metadataPatch) to validate the existing
integration.metadata before merging: if integration.metadata exists and is not a
plain object (typeof === 'object' && !Array.isArray), throw a descriptive error
(or return a failed result) instead of falling back to {} so callers fail fast;
alternatively, if you prefer automatic normalization, explicitly narrow/convert
only when integration.metadata is a plain record and otherwise treat it as
immutable and write only args.metadataPatch (or reject the operation) before
calling ctx.db.patch.

},
});

export const updateIntegration = internalMutation({
args: {
integrationId: v.id('integrations'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ import type { ConversationStatus, ConversationPriority } from './helpers/types';
import {
jsonRecordValidator,
jsonValueValidator,
} from '../../../lib/validators/json';
} from '../../../lib/shared/schemas/utils/json_value';
import { createConversation } from './helpers/create_conversation';
import { createConversationFromEmail } from './helpers/create_conversation_from_email';
import { createConversationFromSentEmail } from './helpers/create_conversation_from_sent_email';
import {
getEmailSyncCursor,
updateEmailSyncCursor,
} from './helpers/email_sync_cursor';
import { queryConversationMessages } from './helpers/query_conversation_messages';
import { queryLatestMessageByDeliveryState } from './helpers/query_latest_message_by_delivery_state';
import { updateConversations } from './helpers/update_conversations';
Expand Down Expand Up @@ -113,12 +117,21 @@ type ConversationActionParams =
priority?: ConversationPriority;
type?: string;
integrationName?: string;
}
| {
operation: 'get_email_sync_cursor';
integrationName: string;
}
| {
operation: 'update_email_sync_cursor';
integrationName: string;
cursor: Record<string, unknown>;
Comment on lines +121 to +128
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C3 '\bnextCursor\b|\bemailSyncCursor\b|\bget_email_sync_cursor\b|\bupdate_email_sync_cursor\b'

Repository: tale-project/tale

Length of output: 15642


🏁 Script executed:

#!/bin/bash
# Search for jsonValueValidator and other validators to understand available options
rg -n 'jsonValueValidator|jsonRecordValidator' --type ts

Repository: tale-project/tale

Length of output: 32225


🏁 Script executed:

#!/bin/bash
# Search for IMAP and other integration connectors that might produce cursors
fd -e ts -path '*/integrations/*' -o -path '*/imap/*' | head -20

Repository: tale-project/tale

Length of output: 229


🏁 Script executed:

#!/bin/bash
# Look for cursor or nextCursor definitions in IMAP-related files
find . -type f -name "*imap*" \( -name "*.ts" -o -name "*.json" \) | head -20

Repository: tale-project/tale

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Search for IMAP integration files
find . -type f -name "*imap*" | grep -E '\.(ts|js|json)$'

Repository: tale-project/tale

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look at the metadata handling precedent mentioned
sed -n '70,85p' services/platform/convex/integrations/internal_mutations.ts

Repository: tale-project/tale

Length of output: 563


🏁 Script executed:

#!/bin/bash
# Search for all cursor/nextCursor producers in integrations
rg -n 'nextCursor.*=' examples/integrations/ --type ts -B2 -A2

Repository: tale-project/tale

Length of output: 519


🏁 Script executed:

#!/bin/bash
# Search for IMAP files in services/platform
find services/platform -type f -name "*imap*" 2>/dev/null | head -20

Repository: tale-project/tale

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check the updateEmailSyncCursor implementation to see if it validates cursor at runtime
rg -n 'updateEmailSyncCursor' services/platform/convex/workflow_engine/action_defs/conversation/helpers/ -A 15

Repository: tale-project/tale

Length of output: 2002


🏁 Script executed:

#!/bin/bash
# Verify the json validator definitions to understand their scope
cat services/platform/convex/lib/shared/schemas/utils/json_value.ts

Repository: tale-project/tale

Length of output: 877


Use jsonValueValidator for truly provider-opaque cursor.

The cursor is documented as "provider-opaque" but the validator restricts it to objects. This contradicts the semantic intent and locks the API to one shape. For consistency with how provider metadata is handled elsewhere (see integrations/internal_mutations.ts line 77, which uses jsonValueValidator for opaque provider data), use jsonValueValidator here. Also widen the TypeScript type from Record<string, unknown> to unknown to match.

Also applies to: 207-217

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@services/platform/convex/workflow_engine/action_defs/conversation/conversation_action.ts`
around lines 121 - 128, Update the cursor typing and validation to be
provider-opaque: change the TypeScript type for the cursor properties in the
union cases inside conversation_action (the branches with operation
'update_email_sync_cursor' and the similar case around lines 207-217) from
Record<string, unknown> to unknown, and replace the current object-specific
validator with jsonValueValidator so the cursor can be any JSON value; ensure
you update both occurrences (the 'update_email_sync_cursor' case and the second
similar block) to use jsonValueValidator and the unknown cursor type for
consistency with integrations/internal_mutations.ts.

};

export const conversationAction: ActionDefinition<ConversationActionParams> = {
type: 'conversation',
title: 'Conversation Operation',
description: `Execute conversation-specific operations (create, get_by_id, query_messages, query_latest_message_by_delivery_state, update, create_from_email, create_from_sent_email). organizationId is automatically read from workflow context variables.
description: `Execute conversation-specific operations (create, get_by_id, query_messages, query_latest_message_by_delivery_state, update, create_from_email, create_from_sent_email, get_email_sync_cursor, update_email_sync_cursor). organizationId is automatically read from workflow context variables.

FOR EMAIL WORKFLOWS:
When creating outbound email conversations, include these fields in the metadata object:
Expand Down Expand Up @@ -191,6 +204,17 @@ See 'product_recommendation_email' predefined workflow for complete example.`,
type: v.optional(v.string()),
integrationName: v.optional(v.string()),
}),
// get_email_sync_cursor: Read provider-opaque sync cursor from integration metadata
v.object({
operation: v.literal('get_email_sync_cursor'),
integrationName: v.string(),
}),
// update_email_sync_cursor: Write provider-opaque sync cursor to integration metadata
v.object({
operation: v.literal('update_email_sync_cursor'),
integrationName: v.string(),
cursor: jsonRecordValidator,
}),
),
async execute(ctx, params, variables) {
// Read and validate organizationId from workflow context variables
Expand Down Expand Up @@ -281,6 +305,22 @@ See 'product_recommendation_email' predefined workflow for complete example.`,
});
}

case 'get_email_sync_cursor': {
return await getEmailSyncCursor(ctx, {
organizationId,
integrationName: params.integrationName,
});
}

case 'update_email_sync_cursor': {
await updateEmailSyncCursor(ctx, {
organizationId,
integrationName: params.integrationName,
cursor: params.cursor,
});
return { success: true };
}

default:
throw new Error(
`Unsupported conversation operation: ${(params as { operation: string }).operation}`,
Expand Down
Loading
Loading