feat(composio): native Rust providers for per-toolkit profile, sync, triggers#509
Conversation
…nctionality - Bump Composio version to 0.52.5 in Cargo.lock. - Introduce periodic synchronization for Composio connections, allowing for regular updates based on defined intervals. - Register both Composio trigger and connection created subscribers to handle events effectively. - Implement a new periodic sync scheduler that manages sync operations for active connections, improving data freshness and integration reliability. - Enhance Composio provider implementations to support user profile fetching and sync operations, ensuring seamless integration with the backend.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds a Composio provider framework (registry, provider trait, Gmail/Notion providers), event-bus subscribers for triggers and connection lifecycle, RPC ops/schemas for profile and sync, and a Once-guarded process-wide periodic sync scheduler started during bootstrap. Changes
Sequence Diagram(s)sequenceDiagram
participant Bootstrap as Bootstrap (startup)
participant Config as Config Loader
participant Periodic as Periodic Scheduler
participant API as Composio Backend
participant Registry as Provider Registry
participant Provider as Provider (Gmail/Notion)
participant Memory as Memory Store
Bootstrap->>Registry: init_default_providers()
Bootstrap->>Periodic: start_periodic_sync()
Periodic->>Config: load_config_with_timeout()
Config-->>Periodic: Arc<Config>
Periodic->>API: ComposioClient.list_connections()
API-->>Periodic: [connections]
loop each ACTIVE/CONNECTED connection
Periodic->>Registry: get_composio_provider(toolkit)
Registry-->>Periodic: ProviderArc / None
alt provider found
Periodic->>Provider: ProviderContext(connection_id) -> sync(Periodic)
Provider->>API: execute_tool(...)
API-->>Provider: tool response
Provider->>Memory: persist snapshot (best-effort)
Memory-->>Provider: ok / err
Provider-->>Periodic: SyncOutcome / error
Periodic->>Periodic: record_sync_success() on success
else provider missing
Periodic-->>Periodic: skip connection
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/openhuman/composio/bus.rs`:
- Around line 229-239: ComposioConnectionCreated is being emitted immediately
after composio_authorize() returns a connectUrl and the code sleeps a fixed 2s,
which races with the user completing OAuth; replace the fixed sleep by polling
the backend for the connection record until its status is ACTIVE/CONNECTED (or
until a reasonable timeout), using the existing connection lookup/get method you
have (e.g. a ComposioConnection service method that returns the connection by
id), apply a short exponential backoff between polls with an overall timeout,
and only call provider.on_connection_created() after you observe the
ACTIVE/CONNECTED state (or abort and log on timeout).
In `@src/openhuman/composio/ops.rs`:
- Around line 303-308: The current parse_sync_reason function silently coerces
unknown strings to SyncReason::Manual; change it to fail-fast by returning a
Result (e.g., fn parse_sync_reason(raw: Option<&str>) -> Result<SyncReason,
ParseSyncReasonError>) or Option and return an Err/None for unknown values
instead of mapping to Manual; implement explicit matches for "periodic" and
"connection_created" that return Ok(SyncReason::Periodic) /
Ok(SyncReason::ConnectionCreated) and return Err for any other string (including
typos), and update any callers such as composio_sync to propagate or convert
that error (don't leave callers assuming it never fails). Ensure the new error
type/message clearly indicates an unrecognized sync reason.
In `@src/openhuman/composio/periodic.rs`:
- Around line 37-50: start_periodic_sync currently spawns a new scheduler each
call, causing duplicate schedulers when invoked from multiple startup paths; add
an internal idempotency guard (e.g., a static OnceLock or std::sync::Once)
inside start_periodic_sync so the body that tokio::spawn(async move { ... })
runs only once and subsequent calls return immediately. Locate
start_periodic_sync and introduce a static guard (like static STARTED:
OnceLock<()> = OnceLock::new() or static START: Once = Once::new()) and wrap the
spawn logic so the spawn is only executed when the guard is first
initialized/called, leaving the existing tokio::spawn block and logging
unchanged.
In `@src/openhuman/composio/providers/gmail.rs`:
- Around line 127-131: The success log currently includes full PII by passing
profile.email to tracing::info; change the log to avoid raw emails—use
tracing::info with connection_id = ?profile.connection_id and either a redacted
flag (e.g., has_email = profile.email.is_some()), a masked value (e.g.,
email_redacted = "<redacted>"), or a derived non-PII indicator (e.g.,
email_domain_present = profile.email.map(|e|
e.split('@').nth(1).is_some()).unwrap_or(false)); update the tracing::info call
around the "[composio:gmail] fetched user profile" message to emit only those
non-PII indicators instead of profile.email.
In `@src/openhuman/composio/providers/mod.rs`:
- Around line 203-207: The default trait implementation of on_connection_created
is currently logging PII by including profile.display_name and profile.email in
the tracing::info! call; change the logging to avoid direct PII exposure—either
remove these fields from the tracing::info! invocation or replace them with
non-sensitive identifiers (e.g., profile.id, a hashed/partial redaction of
display_name/email, or a constant like "<redacted>") and update the log message
to provide context without PII; locate the tracing::info! call in
on_connection_created and modify it to only log safe identifiers or redacted
placeholders.
In `@src/openhuman/composio/providers/notion.rs`:
- Around line 127-133: The comment for the Notion sync says it fetches both
pages and databases but the actual request builds args with a filter of
"property": "object", "value": "page" (in the NOTION_FETCH_DATA request via the
args variable), which only returns pages; either update the comment to say it
fetches only pages or change the filter to include both objects (e.g., remove
the filter or broaden it to allow "page" and "database") so the behavior matches
the comment; modify the comment or the args construction in the function that
builds args to keep them consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 42c0f969-910d-4941-80d1-47a9961e9c85
📒 Files selected for processing (11)
src/core/jsonrpc.rssrc/openhuman/channels/runtime/startup.rssrc/openhuman/composio/bus.rssrc/openhuman/composio/mod.rssrc/openhuman/composio/ops.rssrc/openhuman/composio/periodic.rssrc/openhuman/composio/providers/gmail.rssrc/openhuman/composio/providers/mod.rssrc/openhuman/composio/providers/notion.rssrc/openhuman/composio/providers/registry.rssrc/openhuman/composio/schemas.rs
- bus: poll list_connections with exponential backoff (500ms→4s, 60s cap) until the connection is observed in ACTIVE/CONNECTED before firing on_connection_created, replacing the fixed 2s sleep that raced the OAuth handoff. - ops: parse_sync_reason now returns Result and rejects unrecognized reason strings instead of silently coercing them to Manual; new unit tests cover the happy path and rejection cases. - periodic: add a static OnceLock guard inside start_periodic_sync so the scheduler is only spawned once even when both startup paths call into it. - gmail/providers: redact PII in tracing output — log has_email + email_domain instead of the raw address, and drop display_name + email from the default on_connection_created log. - notion: align the comment with the actual filter (recently-edited pages, databases intentionally excluded for snapshot size reasons).
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/openhuman/composio/periodic.rs (1)
151-165: Usedebug!for per-connection scheduler chatter.These start/success logs run for every due connection, so
info!will dominate steady-state output. Keep lifecycle/failure events atinfo/warnand downgrade this hot-path diagnostic detail todebug!.As per coding guidelines: "Use log/tracing at debug or trace level for development-oriented diagnostics in Rust."♻️ Minimal change
- tracing::info!( + tracing::debug!( toolkit = %conn.toolkit, connection_id = %conn.id, interval_secs, "[composio:periodic] firing sync" ); @@ - tracing::info!( + tracing::debug!( toolkit = %conn.toolkit, connection_id = %conn.id, items = outcome.items_ingested, elapsed_ms = outcome.elapsed_ms(), "[composio:periodic] sync ok" );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/openhuman/composio/periodic.rs` around lines 151 - 165, The per-connection scheduler start and success logs using tracing::info! around the provider.sync(&ctx, SyncReason::Periodic).await call are too noisy; change those tracing::info! invocations (the "firing sync" log before match and the "sync ok" log in the Ok(outcome) arm) to tracing::debug! so routine per-connection diagnostic chatter is downgraded to debug level while leaving lifecycle/failure logs at info/warn levels.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/openhuman/composio/periodic.rs`:
- Around line 73-77: last_sync_at is currently a local HashMap inside run_loop()
so non-periodic syncs (e.g. ComposioConnectionCreatedSubscriber and
ComposioProvider::on_connection_created) cannot update it and the scheduler will
fire immediately; refactor last_sync_at into shared state (e.g., an
Arc<Mutex/parking_lot::Mutex<HashMap<(String,String), Instant>>> or similar)
stored where both run_loop() and the
ComposioConnectionCreatedSubscriber/on_connection_created paths can access it,
change run_loop() to read/update that shared map instead of the local variable,
and update the connection-created handler and
ComposioProvider::on_connection_created() to set the last-success Instant for
the (provider_id, connection_id) key whenever they perform a successful sync so
the periodic ticker respects recent non-periodic syncs.
In `@src/openhuman/composio/providers/notion.rs`:
- Around line 157-158: persist_snapshot currently swallows store_skill_sync
errors by returning 0, letting sync() mark the sync successful; change
persist_snapshot to return a Result<usize, Error> (or otherwise propagate the
underlying error) and update callers (notably sync() where items_ingested is
used around the let items_ingested = persist_snapshot(ctx, &results).await; and
the similar block at lines ~232-279) to check the Result and return Err when
persistence failed so the scheduler does not advance the last-sync timestamp;
also update any caller that assumed an integer return to unwrap or propagate the
Result appropriately (refer to persist_snapshot, sync(), and store_skill_sync).
---
Nitpick comments:
In `@src/openhuman/composio/periodic.rs`:
- Around line 151-165: The per-connection scheduler start and success logs using
tracing::info! around the provider.sync(&ctx, SyncReason::Periodic).await call
are too noisy; change those tracing::info! invocations (the "firing sync" log
before match and the "sync ok" log in the Ok(outcome) arm) to tracing::debug! so
routine per-connection diagnostic chatter is downgraded to debug level while
leaving lifecycle/failure logs at info/warn levels.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 310bbab3-9092-4a50-949e-c44242633bdd
📒 Files selected for processing (6)
src/openhuman/composio/bus.rssrc/openhuman/composio/ops.rssrc/openhuman/composio/periodic.rssrc/openhuman/composio/providers/gmail.rssrc/openhuman/composio/providers/mod.rssrc/openhuman/composio/providers/notion.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/openhuman/composio/ops.rs
- src/openhuman/composio/providers/gmail.rs
- Introduced `record_sync_success` function to track successful syncs for connections, preventing redundant re-firing by the periodic scheduler. - Updated `ComposioTriggerSubscriber` and `ComposioConnectionCreatedSubscriber` to utilize the new sync recording mechanism. - Refactored periodic sync logic to share a process-wide map for last successful sync timestamps, improving synchronization efficiency across event-driven and periodic paths. - Adjusted imports in `mod.rs` to include the new sync recording function.
- Updated the error handling for the `persist_snapshot` function to enhance clarity in error messages, ensuring that failures are logged with specific context for easier debugging.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/openhuman/composio/providers/notion.rs (1)
117-121: Usedebugfor steady-state sync traces.These entry/exit/trigger logs will fire on every periodic sync and trigger dispatch. Keeping them at
infowill make background churn much louder than actual operational signals.As per coding guidelines, "In Rust, use
log/tracingatdebugortracelevel for development-oriented diagnostics in Rust."Also applies to: 168-174, 197-210
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/openhuman/composio/providers/notion.rs` around lines 117 - 121, Change the steady-state/tracing calls that are currently using tracing::info to tracing::debug so periodic sync/trigger logs are emitted at debug level instead of info; specifically update the tracing::info invocation that logs "[composio:notion] sync starting" (the call that includes connection_id = ?ctx.connection_id and reason = reason.as_str()), and make the equivalent changes to the other entry/exit/trigger log blocks referenced around the other spots (the blocks at the ranges you noted) so that all routine sync/dispatch traces use tracing::debug instead of tracing::info.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/openhuman/composio/bus.rs`:
- Around line 180-191: The current code calls
super::periodic::record_sync_success when provider.on_trigger returns Ok(()),
which treats ignored/no-op triggers as successful; change the on_trigger API to
return a success indicator (e.g., Result<bool, String> or
Result<Option<SyncOutcome>, String>) and update the call site in bus.rs to only
call super::periodic::record_sync_success(&toolkit, cid) when the returned value
indicates an actual sync occurred (e.g., true or Some outcome); alternatively,
move the record_sync_success call into the provider implementation immediately
after a successful sync() so providers themselves record real syncs instead of
bus.rs assuming Ok(()) means work was done.
In `@src/openhuman/composio/providers/notion.rs`:
- Around line 243-247: The code currently uses a shared literal "default" when
ctx.connection_id is None, causing collisions; replace the unwrap_or_else(||
"default".to_string()) with generation of a non-colliding identifier (e.g.,
uuid::Uuid::new_v4().to_string() or include ctx.workspace_id if available) and
use that value for connection_label and the title/any snapshot document id
creation (the variables `connection_label` and `title` in this file and the
analogous usages around lines 256-275). Ensure you add the uuid crate import or
reference the workspace identifier you choose, and update all places that form
the snapshot key/document id to use the new unique label instead of the static
"default".
---
Nitpick comments:
In `@src/openhuman/composio/providers/notion.rs`:
- Around line 117-121: Change the steady-state/tracing calls that are currently
using tracing::info to tracing::debug so periodic sync/trigger logs are emitted
at debug level instead of info; specifically update the tracing::info invocation
that logs "[composio:notion] sync starting" (the call that includes
connection_id = ?ctx.connection_id and reason = reason.as_str()), and make the
equivalent changes to the other entry/exit/trigger log blocks referenced around
the other spots (the blocks at the ranges you noted) so that all routine
sync/dispatch traces use tracing::debug instead of tracing::info.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ead735b5-955d-46f6-8e1a-ad72bbfcd837
📒 Files selected for processing (4)
src/openhuman/composio/bus.rssrc/openhuman/composio/mod.rssrc/openhuman/composio/periodic.rssrc/openhuman/composio/providers/notion.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/openhuman/composio/periodic.rs
- src/openhuman/composio/mod.rs
| let connection_label = ctx | ||
| .connection_id | ||
| .clone() | ||
| .unwrap_or_else(|| "default".to_string()); | ||
| let title = format!("notion sync — {connection_label}"); |
There was a problem hiding this comment.
Avoid the shared "default" snapshot key when connection_id is missing.
src/openhuman/composio/bus.rs can dispatch trigger-driven syncs without a connection_id. In that case this code persists every Notion snapshot under the same title/document id (composio-notion-default), so one workspace can overwrite another while the sync still reports success.
🔧 Safer persistence guard
- let connection_label = ctx
- .connection_id
- .clone()
- .unwrap_or_else(|| "default".to_string());
+ let Some(connection_label) = ctx.connection_id.clone() else {
+ tracing::warn!(
+ "[composio:notion] missing connection_id; skipping snapshot persist to avoid cross-connection overwrite"
+ );
+ return Ok(0);
+ };Also applies to: 256-275
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/openhuman/composio/providers/notion.rs` around lines 243 - 247, The code
currently uses a shared literal "default" when ctx.connection_id is None,
causing collisions; replace the unwrap_or_else(|| "default".to_string()) with
generation of a non-colliding identifier (e.g., uuid::Uuid::new_v4().to_string()
or include ctx.workspace_id if available) and use that value for
connection_label and the title/any snapshot document id creation (the variables
`connection_label` and `title` in this file and the analogous usages around
lines 256-275). Ensure you add the uuid crate import or reference the workspace
identifier you choose, and update all places that form the snapshot key/document
id to use the new unique label instead of the static "default".
Summary
ComposioProviderasync trait + per-toolkit registry undersrc/openhuman/composio/providers/— providers exposefetch_user_profile,sync,on_connection_created, andon_triggerhooks.GMAIL_GET_PROFILE+GMAIL_FETCH_EMAILS, reacts toGMAIL_NEW_GMAIL_MESSAGE) and Notion (NOTION_GET_ABOUT_ME+NOTION_FETCH_DATA, reacts to any Notion trigger by re-syncing).openhuman.composio_get_user_profileandopenhuman.composio_syncresolve a connection's toolkit, dispatch to its provider, return a normalized result.ComposioTriggerReceivedandComposioConnectionCreated— both route into the registered provider via spawned tasks so the bus dispatch loop never blocks.composio::periodic) ticks every 60s, walks active connections, and firesprovider.sync(SyncReason::Periodic)whenever a provider'ssync_interval_secshas elapsed.Problem
The Composio module was a thin RPC proxy: list/authorize/execute and a trigger event with a
// TODO: route into skill/channel/cron dispatchstub. There was no way to ship per-toolkit logic — fetching a user's profile from Gmail looked nothing like Notion's, and nothing pulled connection data into memory until the user manually asked an agent to do it. With the QuickJS skills runtime gone (#508), there was also no JS path to lean on anymore.We needed a native, in-process surface where each Composio toolkit can own:
composio:triggerSocket.IO bridge)— without growing the dispatch layer every time a new toolkit lands.
Solution
Trait + registry, not a giant match.
ComposioProvideris the only thing the bus subscriber, periodic scheduler, and RPC ops know about. Adding a toolkit means dropping a new file inproviders/and adding one line toinit_default_providers(); no dispatch-layer changes.Reuses existing infrastructure end-to-end:
ComposioClient(no direct Composio API calls).MemoryClient::store_skill_syncso snapshots land in the sameskill-{id}namespaces the agent loop already recalls from.channels/runtime/startup.rs+core/jsonrpc.rs) next to the existingregister_composio_trigger_subscriber()call.Defensive provider parsing. Composio's response envelope is inconsistent (
data.messagesvsmessagesvsdata.data.messages). Each provider uses tolerantserde_json::Value::pointerwalks + a sharedpick_strhelper for dotted-path field extraction, so a backend wrapper change won't break the providers silently.Connection-created hook delays briefly before running the initial sync — the OAuth handoff event fires when the
connectUrlis returned, before the user has actually clicked through, so an immediate provider call would race the connection goingACTIVE. A 2-second sleep before the first sync is a pragmatic stopgap; a future improvement is listening for an explicit "connection_active" backend event.Trade-offs:
Submission Checklist
extract_messages/extract_resultswalking common envelope shapes,pick_strdotted-path extraction,SyncOutcome::elapsed_mssaturation, bus subscriber dispatch on realDomainEvents without panic, periodic tick interval sanity. All passing via `cargo test --lib openhuman::composio::`.composio_get_user_profile,composio_sync) need a live backend connection to exercise meaningfully, and there's no UI surface yet that calls them. Follow-up will add UI + JSON-RPC e2e against the mock backend once the desktop settings panel wires them up.[composio:provider],[composio:bus],[composio:periodic]) on every checkpoint.last_sync_aton failure, why we usepointerwalks across multiple shapes, why one provider per toolkit) all have inline rationale.Impact
openhuman.composio_get_user_profileandopenhuman.composio_sync. Both fail cleanly when no provider is registered for a connection's toolkit ("no native provider registered for toolkit X").skill-gmail/skill-notionnamespaces. This is intentional so the existing agent recall path picks them up without any extra wiring — but it means a future split between "composio sync" and "js skill sync" data would need a namespace migration.ComposioClient, which already handles JWT auth and never holds raw OAuth tokens locally.composio_*ops, tools, and theComposioTriggerSubscriberlog line all still behave the same (the subscriber now also dispatches into providers).Related
composio_get_user_profile/composio_syncinto the desktop ConnectionsPanel so connected accounts show display name + email.Summary by CodeRabbit
New Features
Tests