feat(platform): Outlook email sync cursor and normalization#815
Conversation
…h limits Add Microsoft Graph (Outlook) message detection and normalization to EmailType, matching the existing Gmail path. Cap email arrays to 20 per batch to stay within Convex action argument size limits, and guard against missing root messageId during conversation creation.
…d integration metadata cursor Decouple email sync state from conversation messages by storing a provider-opaque cursor in integration metadata. This avoids fragile queries against delivered messages and supports incremental sync with proper cursor advancement. - Add email_sync_cursor helper with get/update operations backed by patchIntegrationMetadata internal mutation - Populate Outlook email headers (message-id, in-reply-to, references) in both connector and normalizer for proper threading - Return nextCursor from connector list_messages for sync advancement - Gracefully handle empty email arrays and missing messageId instead of throwing - Unify sent-email path to use normalizeEmails for provider-agnostic input - Extract MAX_EMAILS_PER_BATCH into shared constants module - Extend normalize_email test coverage (bcc, missing fields, mixed arrays) - Bump workflow version to 1.1.0 with new cursor-based sync steps
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
📝 WalkthroughWalkthroughThis PR implements cursor-based pagination for Outlook email synchronization, replacing date-based filtering. It adds two new workflow actions (get_email_sync_cursor and update_email_sync_cursor) to manage cursors stored in integration metadata, updates the Outlook connector to support cursor-driven queries and populate message headers, introduces a MAX_EMAILS_PER_BATCH limit for email processing, extends email normalization to support Microsoft Graph/Outlook raw messages, and adds comprehensive test coverage for Outlook email handling. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@services/platform/convex/integrations/internal_mutations.ts`:
- Around line 95-103: The code currently coerces non-object integration.metadata
to {} which silently erases valid scalar/array metadata; update the patch logic
in the integration update flow (around currentMetadata, merged, ctx.db.patch,
args.integrationId, args.metadataPatch) to validate the existing
integration.metadata before merging: if integration.metadata exists and is not a
plain object (typeof === 'object' && !Array.isArray), throw a descriptive error
(or return a failed result) instead of falling back to {} so callers fail fast;
alternatively, if you prefer automatic normalization, explicitly narrow/convert
only when integration.metadata is a plain record and otherwise treat it as
immutable and write only args.metadataPatch (or reject the operation) before
calling ctx.db.patch.
In
`@services/platform/convex/workflow_engine/action_defs/conversation/conversation_action.ts`:
- Around line 121-128: Update the cursor typing and validation to be
provider-opaque: change the TypeScript type for the cursor properties in the
union cases inside conversation_action (the branches with operation
'update_email_sync_cursor' and the similar case around lines 207-217) from
Record<string, unknown> to unknown, and replace the current object-specific
validator with jsonValueValidator so the cursor can be any JSON value; ensure
you update both occurrences (the 'update_email_sync_cursor' case and the second
similar block) to use jsonValueValidator and the unknown cursor type for
consistency with integrations/internal_mutations.ts.
In
`@services/platform/convex/workflow_engine/action_defs/conversation/helpers/constants.ts`:
- Around line 2-6: The fixed MAX_EMAILS_PER_BATCH = 20 does not guarantee Convex
mutation arg size limits; change the conversation-creation helpers that use
MAX_EMAILS_PER_BATCH (and the constant itself) to enforce a byte-size budget
instead of a fixed count: compute estimated serialized size per email (or strip
heavy fields like full HTML/body/attachments) and accumulate until reaching a
safe threshold (e.g., slightly under 1MB for ctx.runMutation args), replace
usages of MAX_EMAILS_PER_BATCH with this budget-aware batching logic in the
conversation creation functions so both helpers cannot exceed the mutation
argument size.
In
`@services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts`:
- Around line 203-218: Outlook inbound messages currently lack RFC-2822
threading headers (so hardcoded empty strings break createConversationFromEmail
and findRelatedConversation which expect headers.references / in-reply-to);
update RawOutlookMessage/processing to either (A) use the existing
conversationId as the thread key by populating the same threading
fields/metadata from conversationId and wiring that fallback into
createConversationFromEmail and findRelatedConversation, or (B) if accurate RFC
headers are required, add a follow-up Graph call to fetch internetMessageHeaders
and map their "References" and "In-Reply-To" into the existing headers structure
(internetMessageHeaders) before the rest of the pipeline runs; implement one
approach and ensure the places referencing headers.references / in-reply-to
(createConversationFromEmail, findRelatedConversation) accept the new fallback
so threading logic works for Outlook messages.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ac5d5cce-20ce-4b21-9bd2-792cef1e10f6
⛔ Files ignored due to path filters (1)
services/platform/convex/_generated/api.d.tsis excluded by!**/_generated/**
📒 Files selected for processing (10)
examples/integrations/outlook/connector.tsexamples/workflows/outlook/email-sync.jsonservices/platform/convex/integrations/internal_mutations.tsservices/platform/convex/workflow_engine/action_defs/conversation/conversation_action.tsservices/platform/convex/workflow_engine/action_defs/conversation/helpers/__tests__/normalize_email.test.tsservices/platform/convex/workflow_engine/action_defs/conversation/helpers/constants.tsservices/platform/convex/workflow_engine/action_defs/conversation/helpers/create_conversation_from_email.tsservices/platform/convex/workflow_engine/action_defs/conversation/helpers/create_conversation_from_sent_email.tsservices/platform/convex/workflow_engine/action_defs/conversation/helpers/email_sync_cursor.tsservices/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
| const currentMetadata = | ||
| integration.metadata && | ||
| typeof integration.metadata === 'object' && | ||
| !Array.isArray(integration.metadata) | ||
| ? integration.metadata | ||
| : {}; | ||
| const merged = { ...currentMetadata, ...args.metadataPatch }; | ||
|
|
||
| await ctx.db.patch(args.integrationId, { metadata: merged }); |
There was a problem hiding this comment.
Don't silently replace non-object metadata.
createIntegration still accepts arbitrary JSON metadata, so an existing scalar or array value is legal here. Falling back to {} means the first cursor update erases that stored value and replaces it with an object. Fail fast here, or narrow integrations metadata to records before merging.
Proposed guard
- const currentMetadata =
- integration.metadata &&
- typeof integration.metadata === 'object' &&
- !Array.isArray(integration.metadata)
- ? integration.metadata
- : {};
+ if (
+ integration.metadata !== undefined &&
+ (typeof integration.metadata !== 'object' ||
+ integration.metadata === null ||
+ Array.isArray(integration.metadata))
+ ) {
+ throw new Error('Integration metadata must be an object to patch');
+ }
+
+ const currentMetadata = integration.metadata ?? {};
const merged = { ...currentMetadata, ...args.metadataPatch };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@services/platform/convex/integrations/internal_mutations.ts` around lines 95
- 103, The code currently coerces non-object integration.metadata to {} which
silently erases valid scalar/array metadata; update the patch logic in the
integration update flow (around currentMetadata, merged, ctx.db.patch,
args.integrationId, args.metadataPatch) to validate the existing
integration.metadata before merging: if integration.metadata exists and is not a
plain object (typeof === 'object' && !Array.isArray), throw a descriptive error
(or return a failed result) instead of falling back to {} so callers fail fast;
alternatively, if you prefer automatic normalization, explicitly narrow/convert
only when integration.metadata is a plain record and otherwise treat it as
immutable and write only args.metadataPatch (or reject the operation) before
calling ctx.db.patch.
| | { | ||
| operation: 'get_email_sync_cursor'; | ||
| integrationName: string; | ||
| } | ||
| | { | ||
| operation: 'update_email_sync_cursor'; | ||
| integrationName: string; | ||
| cursor: Record<string, unknown>; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 '\bnextCursor\b|\bemailSyncCursor\b|\bget_email_sync_cursor\b|\bupdate_email_sync_cursor\b'Repository: tale-project/tale
Length of output: 15642
🏁 Script executed:
#!/bin/bash
# Search for jsonValueValidator and other validators to understand available options
rg -n 'jsonValueValidator|jsonRecordValidator' --type tsRepository: tale-project/tale
Length of output: 32225
🏁 Script executed:
#!/bin/bash
# Search for IMAP and other integration connectors that might produce cursors
fd -e ts -path '*/integrations/*' -o -path '*/imap/*' | head -20Repository: tale-project/tale
Length of output: 229
🏁 Script executed:
#!/bin/bash
# Look for cursor or nextCursor definitions in IMAP-related files
find . -type f -name "*imap*" \( -name "*.ts" -o -name "*.json" \) | head -20Repository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Search for IMAP integration files
find . -type f -name "*imap*" | grep -E '\.(ts|js|json)$'Repository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look at the metadata handling precedent mentioned
sed -n '70,85p' services/platform/convex/integrations/internal_mutations.tsRepository: tale-project/tale
Length of output: 563
🏁 Script executed:
#!/bin/bash
# Search for all cursor/nextCursor producers in integrations
rg -n 'nextCursor.*=' examples/integrations/ --type ts -B2 -A2Repository: tale-project/tale
Length of output: 519
🏁 Script executed:
#!/bin/bash
# Search for IMAP files in services/platform
find services/platform -type f -name "*imap*" 2>/dev/null | head -20Repository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Check the updateEmailSyncCursor implementation to see if it validates cursor at runtime
rg -n 'updateEmailSyncCursor' services/platform/convex/workflow_engine/action_defs/conversation/helpers/ -A 15Repository: tale-project/tale
Length of output: 2002
🏁 Script executed:
#!/bin/bash
# Verify the json validator definitions to understand their scope
cat services/platform/convex/lib/shared/schemas/utils/json_value.tsRepository: tale-project/tale
Length of output: 877
Use jsonValueValidator for truly provider-opaque cursor.
The cursor is documented as "provider-opaque" but the validator restricts it to objects. This contradicts the semantic intent and locks the API to one shape. For consistency with how provider metadata is handled elsewhere (see integrations/internal_mutations.ts line 77, which uses jsonValueValidator for opaque provider data), use jsonValueValidator here. Also widen the TypeScript type from Record<string, unknown> to unknown to match.
Also applies to: 207-217
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@services/platform/convex/workflow_engine/action_defs/conversation/conversation_action.ts`
around lines 121 - 128, Update the cursor typing and validation to be
provider-opaque: change the TypeScript type for the cursor properties in the
union cases inside conversation_action (the branches with operation
'update_email_sync_cursor' and the similar case around lines 207-217) from
Record<string, unknown> to unknown, and replace the current object-specific
validator with jsonValueValidator so the cursor can be any JSON value; ensure
you update both occurrences (the 'update_email_sync_cursor' case and the second
similar block) to use jsonValueValidator and the unknown cursor type for
consistency with integrations/internal_mutations.ts.
| * Maximum number of emails to process in a single conversation creation batch. | ||
| * Keeps oldest-first to preserve the root email needed for threading. | ||
| * Constrained by Convex action argument size limits (~1MB for ctx.runMutation args). | ||
| */ | ||
| export const MAX_EMAILS_PER_BATCH = 20; |
There was a problem hiding this comment.
A fixed count does not actually enforce the payload budget.
20 is only a heuristic here: a single large HTML thread can still exceed the mutation arg size limit, while small threads get truncated unnecessarily. Because both conversation-creation helpers now depend on this constant for safety, the cap should be based on estimated serialized byte size (or strip heavyweight fields before mutation), not item count.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@services/platform/convex/workflow_engine/action_defs/conversation/helpers/constants.ts`
around lines 2 - 6, The fixed MAX_EMAILS_PER_BATCH = 20 does not guarantee
Convex mutation arg size limits; change the conversation-creation helpers that
use MAX_EMAILS_PER_BATCH (and the constant itself) to enforce a byte-size budget
instead of a fixed count: compute estimated serialized size per email (or strip
heavy fields like full HTML/body/attachments) and accumulate until reaching a
safe threshold (e.g., slightly under 1MB for ctx.runMutation args), replace
usages of MAX_EMAILS_PER_BATCH with this budget-aware batching logic in the
conversation creation functions so both helpers cannot exceed the mutation
argument size.
| interface RawOutlookMessage { | ||
| id?: string; | ||
| internetMessageId?: string; | ||
| conversationId?: string; | ||
| subject?: string; | ||
| receivedDateTime?: string; | ||
| sentDateTime?: string; | ||
| isRead?: boolean; | ||
| hasAttachments?: boolean; | ||
| from?: { emailAddress?: OutlookEmailAddress }; | ||
| toRecipients?: OutlookRecipient[]; | ||
| ccRecipients?: OutlookRecipient[]; | ||
| bccRecipients?: OutlookRecipient[]; | ||
| body?: { contentType?: string; content?: string }; | ||
| attachments?: OutlookAttachment[]; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 '\binternetMessageHeaders\b|\binternetMessageId\b|\bIn-Reply-To\b|\bReferences\b|\bmessage-id\b'Repository: tale-project/tale
Length of output: 50373
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if internetMessageHeaders is used anywhere in the workflow engine
rg -l 'internetMessageHeaders' services/platform/convex/
# Also check what fields the Outlook connector is actually selecting/requesting
rg -A5 '\$select.*internetMessageId' services/platform/convex/Repository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check what fields email-sync workflow requests from Outlook
cat examples/workflows/outlook/email-sync.json | grep -A2 -B2 '"select"'
# Check if there's any mention of threading headers in Outlook integration docs or comments
rg -i 'internetmessageheaders|threading|in-reply-to' services/platform/convex/workflow_engine/action_defs/conversation/ --type ts
# Verify what the sent message path does for Outlook (vs. what's mentioned in the comment)
rg -B5 -A10 'createReply|sendReply' examples/integrations/outlook/connector.ts | head -40Repository: tale-project/tale
Length of output: 3951
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if there's any discussion or TODOs about Outlook threading headers
rg -i 'todo|fixme|xxx|hack' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
# Check the full normalization function for Outlook to see the complete picture
sed -n '230,280p' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
# Verify the Gmail connector comparison - does it also NOT fetch headers from raw API?
sed -n '140,190p' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.tsRepository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check the file structure and size
wc -l services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
# View the complete normalizeOutlookMessage function
rg -A30 'function normalizeOutlookMessage' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
# View the complete normalizeGmailMessage function to compare
rg -A25 'function normalizeGmailMessage' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.tsRepository: tale-project/tale
Length of output: 157
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# View the entire file to understand both normalization paths
cat -n services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts | sed -n '140,280p'Repository: tale-project/tale
Length of output: 5508
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if Microsoft Graph API documentation or examples mention internetMessageHeaders
rg 'internetMessageHeaders' . --type ts --type json
# Search for any comments or documentation about why Outlook headers are not extracted
rg -B10 "'in-reply-to': ''" services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
# Check find_related_conversation to see how it handles empty headers
sed -n '1,50p' services/platform/convex/workflow_engine/action_defs/conversation/helpers/find_related_conversation.tsRepository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if find_related_conversation actually depends on the headers being populated
cat services/platform/convex/workflow_engine/action_defs/conversation/helpers/find_related_conversation.tsRepository: tale-project/tale
Length of output: 929
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify that create_conversation_from_email actually fails when references is empty
sed -n '100,130p' services/platform/convex/workflow_engine/action_defs/conversation/helpers/create_conversation_from_email.ts
# Check if there's any fallback logic when headers are empty
sed -n '105,225p' services/platform/convex/workflow_engine/action_defs/conversation/helpers/create_conversation_from_email.ts | head -60Repository: tale-project/tale
Length of output: 3309
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if there are any comments or documentation about Outlook limitations
rg -B5 -A5 'Outlook|Graph' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts | head -50
# Check the example Outlook integration comment about threading
sed -n '665,675p' examples/integrations/outlook/connector.ts
# Search for any discussion about the Outlook header limitation across the codebase
rg -i 'outlook.*header|graph.*header' services/platform/convex/ examples/ --type tsRepository: tale-project/tale
Length of output: 2061
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for any alternative approaches or discussion of Outlook threading
rg -i 'threading|reply.*outlook|createreply' services/platform/convex/workflow_engine/action_defs/conversation/ --type ts
# Check if there's a way to fetch additional properties from Graph
rg '\$expand|\$properties|singleValueExtendedProperties|multiValueExtendedProperties' examples/integrations/outlook/Repository: tale-project/tale
Length of output: 860
🏁 Script executed:
#!/bin/bash
set -euo pipefill
# Check if there's any documentation or code about extended properties in Outlook Graph
rg -i 'extended.*propert|mapi' examples/integrations/outlook/
# Verify the complete picture: is internetMessageHeaders actually available in Microsoft Graph at all?
# Check if the connector makes any secondary calls or has any workarounds
grep -n 'createReply\|check_delivery\|fetch\|GET.*messages' examples/integrations/outlook/connector.ts | head -20Repository: tale-project/tale
Length of output: 114
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if there's any documentation or code about extended properties in Outlook Graph
rg -i 'extended.*propert|mapi' examples/integrations/outlook/
# Verify the complete picture: is internetMessageHeaders actually available in Microsoft Graph at all?
# Check if the connector makes any secondary calls or has any workarounds
grep -n 'createReply\|check_delivery\|fetch\|GET.*messages' examples/integrations/outlook/connector.ts | head -20Repository: tale-project/tale
Length of output: 43
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Final check: verify what Microsoft Graph standard message properties include
# by checking Microsoft documentation references or comments in the codebase
rg -i 'microsoft.*graph|graph.*api|outlook.*api' services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts
# Check if conversationId is being used as an alternative for grouping Outlook threads
rg -n 'conversationId' services/platform/convex/workflow_engine/action_defs/conversation/helpers/*.ts | head -20Repository: tale-project/tale
Length of output: 3364
Threading headers are critical but missing for Outlook inbound messages.
createConversationFromEmail roots threads by extracting the first message ID from headers.references (lines 108–109), and findRelatedConversation depends on in-reply-to and references to link related conversations. Currently, Outlook messages hardcode these headers to empty strings (lines 259–262), preventing any threading logic from working.
However, the proposed fix (adding internetMessageHeaders to RawOutlookMessage) has a blocker: Microsoft Graph's standard message queries do not expose threading headers in the raw message object. The example Outlook workflow's $select parameter does not request internetMessageHeaders, and no production code attempts to use it.
The comment in the example connector stating "Graph sets threading headers automatically" applies only to createReply (outbound messages), not to message synchronization.
Consider either:
- Using Outlook's native
conversationIdproperty (already available inRawOutlookMessage) to group threads instead of RFC 2822 headers - Making a separate Graph API call to fetch
internetMessageHeadersif the connector architecture supports it - Documenting that Outlook inbound messages cannot satisfy the threading contract with the current Gmail-centric header design
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@services/platform/convex/workflow_engine/action_defs/conversation/helpers/normalize_email.ts`
around lines 203 - 218, Outlook inbound messages currently lack RFC-2822
threading headers (so hardcoded empty strings break createConversationFromEmail
and findRelatedConversation which expect headers.references / in-reply-to);
update RawOutlookMessage/processing to either (A) use the existing
conversationId as the thread key by populating the same threading
fields/metadata from conversationId and wiring that fallback into
createConversationFromEmail and findRelatedConversation, or (B) if accurate RFC
headers are required, add a follow-up Graph call to fetch internetMessageHeaders
and map their "References" and "In-Reply-To" into the existing headers structure
(internetMessageHeaders) before the rest of the pipeline runs; implement one
approach and ensure the places referencing headers.references / in-reply-to
(createConversationFromEmail, findRelatedConversation) accept the new fallback
so threading logic works for Outlook messages.
Switch import from permissive v.any() validator (lib/validators/json) to strict v.record() validator (lib/shared/schemas/utils/json_value) for consistency with internal_mutations.ts.
Summary
email_sync_cursorhelper +patchIntegrationMetadatamutation), enabling reliable incremental email syncnormalizeEmailsMAX_EMAILS_PER_BATCHinto shared constants; returnnextCursorfrom connectorlist_messagesTest plan
normalize_email.test.tswith coverage for Outlook headers, bcc, missing fields, mixed provider arrays, and edge cases🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Tests