Skip to content

Move conversation persistence into workspace memory store#503

Merged
senamakel merged 4 commits intotinyhumansai:mainfrom
senamakel:feat/conver-workspace
Apr 11, 2026
Merged

Move conversation persistence into workspace memory store#503
senamakel merged 4 commits intotinyhumansai:mainfrom
senamakel:feat/conver-workspace

Conversation

@senamakel
Copy link
Copy Markdown
Member

@senamakel senamakel commented Apr 11, 2026

Summary

  • Move conversation thread and message persistence from persisted Redux state into the Rust memory module backed by workspace JSONL files.
  • Add memory RPC controllers for listing, creating, appending, updating, deleting, and purging conversation threads/messages.
  • Update the app thread API, thread slice, and conversations page to load and persist chat state through core RPC instead of browser storage.
  • Keep Redux as transient UI cache only; thread history now survives at the workspace level.
  • Add focused Rust and Vitest coverage for the new conversation persistence path.

Problem

  • The current conversations UI stores all thread and message history in persisted Redux state.
  • That makes session history a frontend-owned durability concern instead of workspace-owned application state.
  • It also prevents the Rust core memory layer from being the source of truth for conversations and makes thread/message state harder to inspect, migrate, or reuse outside the UI process.

Solution

  • Add a new workspace-backed conversation store under src/openhuman/memory/conversations/.
  • Store thread metadata in append-only threads.jsonl and store each thread's messages in its own JSONL log under the workspace memory directory.
  • Expose the store through new memory RPC methods so the frontend can list threads, create the default thread, load messages, append streamed assistant/user messages, update message metadata for reactions, delete threads, and purge all conversation state.
  • Replace persisted Redux thread storage with async core-backed loading and writing while preserving the existing UI flow in Conversations.tsx.
  • Keep debug logging and JSONL rewrite behavior explicit for metadata patch operations such as reactions.

Submission Checklist

  • Unit tests — Vitest (app/) and/or cargo test (core) for logic you add or change
  • E2E / integration — Where behavior is user-visible or crosses UI → Tauri → sidecar → JSON-RPC; use existing harnesses (app/test/e2e, mock backend, tests/json_rpc_e2e.rs as appropriate)
  • N/A — No new E2E spec added in this PR; covered with focused Rust store tests plus app RPC client test and compile checks
  • Doc comments/// / //! (Rust), JSDoc or brief file/module headers (TS) on public APIs and non-obvious modules
  • Inline comments — Where logic, invariants, or edge cases aren’t clear from names alone (keep them grep-friendly; avoid restating the code)

Impact

  • Desktop conversation persistence now lives under the workspace instead of browser storage.
  • This changes the durability boundary but keeps the existing user-facing conversation flow intact.
  • Message metadata updates now rewrite a thread JSONL file, which is acceptable for the current low-frequency reaction/update path.
  • Existing repo-wide lint warnings unrelated to this change still appear during pre-push, but push-blocking checks passed.

Related

  • Issue(s):
  • Follow-up PR(s)/TODOs:
    • Add end-to-end coverage for conversation restore/reload and purge flows through the desktop stack.

Summary by CodeRabbit

  • New Features

    • Conversations and messages now persist to workspace-backed storage; persisted channel turns are recorded and de-duplicated.
    • Message loading stays in sync with the selected thread; GIF responses are flagged as GIFs.
    • Reactions persist and synchronize from both events and the UI.
  • Tests

    • Added unit tests covering thread API behavior.
  • Refactor

    • Thread/message operations migrated from REST to RPC and storage/persistence behavior updated.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 11, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a44577a6-7e62-4ebc-aedf-15ef89e8e43a

📥 Commits

Reviewing files that changed from the base of the PR and between 8a5cfb1 and 66083bd.

📒 Files selected for processing (3)
  • src/core/jsonrpc.rs
  • src/openhuman/channels/runtime/startup.rs
  • src/openhuman/event_bus/events.rs
✅ Files skipped from review due to trivial changes (1)
  • src/openhuman/channels/runtime/startup.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/openhuman/event_bus/events.rs
  • src/core/jsonrpc.rs

📝 Walkthrough

Walkthrough

Replaces frontend thread/message flows with RPC-backed memory calls, removes persisted Redux optimistic send flows, adds workspace-backed JSONL conversation store and RPC handlers, and registers an event-bus subscriber that persists channel turns into the workspace store.

Changes

Cohort / File(s) Summary
Frontend Conversations UI
app/src/pages/Conversations.tsx
Rewires thread lifecycle to dispatch loadThreads(), createThreadLocal(...), setSelectedThread(...), and loadThreadMessages(...); adds effect to load messages on selectedThreadId changes; replaces addReaction with persistReaction; prefixes several dispatches with void; GIF inference responses include type: 'gif'.
Frontend API & Tests
app/src/services/api/threadApi.ts, app/src/services/api/threadApi.test.ts
Migrates REST/tauri calls to callCoreRpc using openhuman.memory_* RPCs; changes createThread signature to { id, title, createdAt }; adds appendMessage/updateMessage; removes sendMessage/getSuggestQuestions; introduces Envelope<T> unwrapping; adds unit tests for list/append.
Frontend Store / Slice
app/src/store/index.ts, app/src/store/threadSlice.ts
Removes redux-persist for thread slice; drops optimistic send reducers; adds async thunks (loadThreads, createThreadLocal, loadThreadMessages, addMessageLocal, addInferenceResponse, persistReaction) and cache helpers (appendMessageToCache, replaceMessagesForThread); introduces isLoadingThreads and adjusts selection/active-thread handling.
Rust: Conversation Store & Types
src/openhuman/memory/conversations/store.rs, src/openhuman/memory/conversations/types.rs, src/openhuman/memory/conversations/mod.rs
Adds workspace-backed JSONL conversation store with ConversationStore, thread/message models (ConversationThread, ConversationMessage, CreateConversationThread, ConversationMessagePatch), JSONL layout (threads.jsonl + per-thread files), ensure/list/append/update/delete/purge operations, concurrency via a global mutex, and unit tests.
Rust: RPC Models, Ops & Schemas
src/openhuman/memory/rpc_models.rs, src/openhuman/memory/ops.rs, src/openhuman/memory/schemas.rs, src/openhuman/memory/mod.rs
Adds RPC models and seven new RPC handlers (memory_threads_list, memory_thread_upsert, memory_messages_list, memory_message_append, memory_message_update, memory_thread_delete, memory_threads_purge); wires handlers into schema dispatch and returns ApiEnvelope-wrapped responses with counts.
Rust: Event Bus Subscriber & Wiring
src/openhuman/memory/conversations/bus.rs, src/openhuman/channels/runtime/startup.rs, src/core/jsonrpc.rs
Introduces conversation persistence subscriber that registers once with a workspace_dir, persists channel turns into workspace threads/messages, prevents duplicate events, and registers the subscriber at channel startup via start_channels/domain subscriber bootstrap.
Rust: Event Payload & Channel Changes
src/openhuman/event_bus/events.rs, src/openhuman/channels/runtime/dispatch.rs, src/openhuman/channels/mod.rs
Extends DomainEvent::ChannelMessageReceived/Processed payloads to include message_id and reply_target (and thread_ts for processed); process_channel_message populates these fields; makes channels::context pub(crate).

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Frontend as Frontend UI
participant Core as Core RPC Server
participant Store as ConversationStore (JSONL)
participant Bus as EventBus Subscriber

Frontend->>Core: callCoreRpc(openhuman.memory_threads_list)
Core->>Store: list_threads(workspace_dir)
Store-->>Core: threads list
Core-->>Frontend: ApiEnvelope{threads}

Frontend->>Core: callCoreRpc(openhuman.memory_message_append)
Core->>Store: append_message(...)
Store-->>Core: appended message
Core-->>Frontend: ApiEnvelope{message}

Bus->>Store: ensure_thread(...) / append_message(...) (on ChannelMessageReceived/Processed)
Store-->>Bus: persisted message record

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested reviewers

  • YellowSnnowmann

Poem

🐇
I hop through threads both old and new,
JSONL burrows where messages grew.
Redux nudges RPC’s door,
Workspace keeps tales forevermore —
nibble bytes, persist, and chew! 🎉

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.79% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'Move conversation persistence into workspace memory store' clearly and concisely summarizes the main change: migrating conversation persistence from Redux state to a workspace-backed memory store.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
src/openhuman/memory/conversations/store.rs (2)

18-18: Global mutex serializes all workspace operations.

CONVERSATION_STORE_LOCK is a single global mutex shared across all ConversationStore instances. If the application ever operates on multiple workspaces concurrently, this could become a contention point.

For now this is likely acceptable given single-workspace usage, but consider per-workspace locking (e.g., keyed by workspace_dir) if multi-workspace support becomes a requirement.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/conversations/store.rs` at line 18,
CONVERSATION_STORE_LOCK is a single global Mutex that serializes all
ConversationStore operations and will become a contention point for multiple
workspaces; replace it with a keyed lock map (e.g., a static concurrent map from
workspace_dir to a per-workspace Mutex or RwLock) and change ConversationStore
access paths to acquire the lock for the specific workspace_dir key rather than
the global CONVERSATION_STORE_LOCK; ensure the keyed map entry creation is
thread-safe (use DashMap, once_cell + Mutex<HashMap<..>>, or similar) and that
locks are cleaned up if needed to avoid unbounded growth.

114-143: Consider append-only patching for message updates.

update_message currently reads all messages, patches one, and rewrites the entire JSONL file. For threads with many messages, this O(n) rewrite on every reaction toggle could become a performance bottleneck.

Consider an append-only patch strategy (e.g., appending {"op":"patch","message_id":"...","extra_metadata":{...}} entries) and reconstructing state on read, similar to how threads.jsonl handles upsert/delete operations. This would make updates O(1) at the cost of slightly more complex read logic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/conversations/store.rs` around lines 114 - 143, The
current update_message in update_message reads all messages and calls
rewrite_jsonl, causing O(n) writes; change it to append-only by writing a patch
record (e.g., {"op":"patch","message_id":..., "extra_metadata":...}) to the same
path returned by thread_messages_path(thread_id) instead of mutating the whole
file, and update the reader logic (either read_jsonl or the thread read path
that constructs ConversationMessage state) to replay base messages plus
subsequent patch entries to reconstruct the current ConversationMessage state on
read; ensure ConversationMessagePatch is serialized in the appended record
format and that update_message returns the reconstructed updated
ConversationMessage after appending.
app/src/store/threadSlice.ts (1)

47-84: Use arrow helpers for the new cache utilities.

These two module-level helpers are new code, and the repo convention here is const + arrow functions.

♻️ Suggested refactor
-function appendMessageToCache(
+const appendMessageToCache = (
   state: ThreadState,
   threadId: string,
   message: ThreadMessage,
   replaceExisting = false
-) {
+) => {
   const existing = state.messagesByThreadId[threadId] ?? [];
   const nextStored = replaceExisting
     ? existing.map(entry => (entry.id === message.id ? message : entry))
@@
   if (thread) {
     thread.messageCount = nextStored.length;
     thread.lastMessageAt =
       nextStored.length > 0 ? nextStored[nextStored.length - 1].createdAt : thread.createdAt;
   }
-}
+};
 
-function replaceMessagesForThread(state: ThreadState, threadId: string, messages: ThreadMessage[]) {
+const replaceMessagesForThread = (
+  state: ThreadState,
+  threadId: string,
+  messages: ThreadMessage[]
+) => {
   state.messagesByThreadId[threadId] = messages;
   if (threadId === state.selectedThreadId) {
     state.messages = messages;
@@
   if (thread) {
     thread.messageCount = messages.length;
     thread.lastMessageAt =
       messages.length > 0 ? messages[messages.length - 1].createdAt : thread.createdAt;
   }
-}
+};

As per coding guidelines, **/*.{js,jsx,ts,tsx}: "Use const by default, let if reassignment is needed, avoid var" and "Prefer arrow functions over function declarations".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/src/store/threadSlice.ts` around lines 47 - 84, Convert the two function
declarations appendMessageToCache and replaceMessagesForThread into const arrow
function helpers to follow the repo convention (use const + arrow functions);
specifically, replace "function appendMessageToCache(...)" with "const
appendMessageToCache = (...) => { ... }" and "function
replaceMessagesForThread(...)" with "const replaceMessagesForThread = (...) => {
... }" while preserving all parameter names, logic, and references (state,
threadId, message, messages, replaceExisting) so callers and exports remain
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/src/pages/Conversations.tsx`:
- Around line 276-289: The current useEffect uses a .then() chain after
dispatch(createThreadLocal(...)); convert this to async/await by defining an
async IIFE inside useEffect (or an async inner function) and await
dispatch(createThreadLocal({...})) before calling
dispatch(setSelectedThread(DEFAULT_THREAD_ID)) and await
dispatch(loadThreadMessages(DEFAULT_THREAD_ID)); keep the initial void
dispatch(loadThreads()); call as-is or await it if desired, and wrap the awaited
calls in try/catch to handle errors; update references: useEffect, loadThreads,
createThreadLocal, setSelectedThread, loadThreadMessages, DEFAULT_THREAD_ID,
DEFAULT_THREAD_TITLE.

In `@app/src/services/api/threadApi.ts`:
- Around line 12-21: The unwrapEnvelope function currently treats an envelope
with a missing data field as a successful response; update unwrapEnvelope<T> to
detect envelope-level failures by checking for an "error" property on the
response object and, if present (or if data is undefined/null when an envelope
was expected), throw an Error (or rethrow the embedded error message) so the
thunk rejects instead of returning undefined; preserve behavior for raw
non-envelope responses by returning the value when the object is not an
envelope. Ensure you update the Envelope<T> handling in unwrapEnvelope and any
places calling unwrapEnvelope to expect a thrown error for envelope failures.

In `@src/openhuman/memory/rpc_models.rs`:
- Around line 186-193: The PurgeConversationThreadsResponse struct's
agent_messages_deleted field is being incorrectly populated with
stats.message_count (same as messages_deleted); locate the code that constructs
PurgeConversationThreadsResponse (the mapping where agent_messages_deleted:
stats.message_count is set) and either replace stats.message_count with the
correct agent-specific counter from the stats object (e.g.,
stats.agent_message_count or similar) or remove the agent_messages_deleted field
from the response and struct if no distinct agent count exists; update the
PurgeConversationThreadsResponse definition and all construction sites
(referencing the struct name PurgeConversationThreadsResponse and the
agent_messages_deleted identifier) to keep the types and serialization
consistent.

---

Nitpick comments:
In `@app/src/store/threadSlice.ts`:
- Around line 47-84: Convert the two function declarations appendMessageToCache
and replaceMessagesForThread into const arrow function helpers to follow the
repo convention (use const + arrow functions); specifically, replace "function
appendMessageToCache(...)" with "const appendMessageToCache = (...) => { ... }"
and "function replaceMessagesForThread(...)" with "const
replaceMessagesForThread = (...) => { ... }" while preserving all parameter
names, logic, and references (state, threadId, message, messages,
replaceExisting) so callers and exports remain unchanged.

In `@src/openhuman/memory/conversations/store.rs`:
- Line 18: CONVERSATION_STORE_LOCK is a single global Mutex that serializes all
ConversationStore operations and will become a contention point for multiple
workspaces; replace it with a keyed lock map (e.g., a static concurrent map from
workspace_dir to a per-workspace Mutex or RwLock) and change ConversationStore
access paths to acquire the lock for the specific workspace_dir key rather than
the global CONVERSATION_STORE_LOCK; ensure the keyed map entry creation is
thread-safe (use DashMap, once_cell + Mutex<HashMap<..>>, or similar) and that
locks are cleaned up if needed to avoid unbounded growth.
- Around line 114-143: The current update_message in update_message reads all
messages and calls rewrite_jsonl, causing O(n) writes; change it to append-only
by writing a patch record (e.g., {"op":"patch","message_id":...,
"extra_metadata":...}) to the same path returned by
thread_messages_path(thread_id) instead of mutating the whole file, and update
the reader logic (either read_jsonl or the thread read path that constructs
ConversationMessage state) to replay base messages plus subsequent patch entries
to reconstruct the current ConversationMessage state on read; ensure
ConversationMessagePatch is serialized in the appended record format and that
update_message returns the reconstructed updated ConversationMessage after
appending.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d9164a30-bbc9-4781-a5f3-b426dd2a4500

📥 Commits

Reviewing files that changed from the base of the PR and between 57d307a and f32cb92.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • app/src-tauri/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • app/src/pages/Conversations.tsx
  • app/src/services/api/threadApi.test.ts
  • app/src/services/api/threadApi.ts
  • app/src/store/index.ts
  • app/src/store/threadSlice.ts
  • src/openhuman/memory/conversations/mod.rs
  • src/openhuman/memory/conversations/store.rs
  • src/openhuman/memory/conversations/types.rs
  • src/openhuman/memory/mod.rs
  • src/openhuman/memory/ops.rs
  • src/openhuman/memory/rpc_models.rs
  • src/openhuman/memory/schemas.rs

Comment on lines 276 to 289
useEffect(() => {
const defaultThread = threads.find(t => t.id === DEFAULT_THREAD_ID);

if (!defaultThread) {
dispatch(
createThreadLocal({
id: DEFAULT_THREAD_ID,
title: DEFAULT_THREAD_TITLE,
createdAt: new Date().toISOString(),
})
);
}

// Always set selected thread to ensure messages view is synced from persisted storage
dispatch(setSelectedThread(DEFAULT_THREAD_ID));
void dispatch(loadThreads());
void dispatch(
createThreadLocal({
id: DEFAULT_THREAD_ID,
title: DEFAULT_THREAD_TITLE,
createdAt: new Date().toISOString(),
})
).then(() => {
dispatch(setSelectedThread(DEFAULT_THREAD_ID));
void dispatch(loadThreadMessages(DEFAULT_THREAD_ID));
});
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [dispatch]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Convert .then() chain to async/await.

Per coding guidelines, prefer async/await for promises instead of .then() chains. Consider wrapping the initialization in an async IIFE:

  useEffect(() => {
-   void dispatch(loadThreads());
-   void dispatch(
-     createThreadLocal({
-       id: DEFAULT_THREAD_ID,
-       title: DEFAULT_THREAD_TITLE,
-       createdAt: new Date().toISOString(),
-     })
-   ).then(() => {
-     dispatch(setSelectedThread(DEFAULT_THREAD_ID));
-     void dispatch(loadThreadMessages(DEFAULT_THREAD_ID));
-   });
+   void (async () => {
+     void dispatch(loadThreads());
+     await dispatch(
+       createThreadLocal({
+         id: DEFAULT_THREAD_ID,
+         title: DEFAULT_THREAD_TITLE,
+         createdAt: new Date().toISOString(),
+       })
+     );
+     dispatch(setSelectedThread(DEFAULT_THREAD_ID));
+     void dispatch(loadThreadMessages(DEFAULT_THREAD_ID));
+   })();
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, [dispatch]);

As per coding guidelines: "Always use async/await for promises in TypeScript instead of .then() chains."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
useEffect(() => {
const defaultThread = threads.find(t => t.id === DEFAULT_THREAD_ID);
if (!defaultThread) {
dispatch(
createThreadLocal({
id: DEFAULT_THREAD_ID,
title: DEFAULT_THREAD_TITLE,
createdAt: new Date().toISOString(),
})
);
}
// Always set selected thread to ensure messages view is synced from persisted storage
dispatch(setSelectedThread(DEFAULT_THREAD_ID));
void dispatch(loadThreads());
void dispatch(
createThreadLocal({
id: DEFAULT_THREAD_ID,
title: DEFAULT_THREAD_TITLE,
createdAt: new Date().toISOString(),
})
).then(() => {
dispatch(setSelectedThread(DEFAULT_THREAD_ID));
void dispatch(loadThreadMessages(DEFAULT_THREAD_ID));
});
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [dispatch]);
useEffect(() => {
void (async () => {
void dispatch(loadThreads());
await dispatch(
createThreadLocal({
id: DEFAULT_THREAD_ID,
title: DEFAULT_THREAD_TITLE,
createdAt: new Date().toISOString(),
})
);
dispatch(setSelectedThread(DEFAULT_THREAD_ID));
void dispatch(loadThreadMessages(DEFAULT_THREAD_ID));
})();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [dispatch]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/src/pages/Conversations.tsx` around lines 276 - 289, The current
useEffect uses a .then() chain after dispatch(createThreadLocal(...)); convert
this to async/await by defining an async IIFE inside useEffect (or an async
inner function) and await dispatch(createThreadLocal({...})) before calling
dispatch(setSelectedThread(DEFAULT_THREAD_ID)) and await
dispatch(loadThreadMessages(DEFAULT_THREAD_ID)); keep the initial void
dispatch(loadThreads()); call as-is or await it if desired, and wrap the awaited
calls in try/catch to handle errors; update references: useEffect, loadThreads,
createThreadLocal, setSelectedThread, loadThreadMessages, DEFAULT_THREAD_ID,
DEFAULT_THREAD_TITLE.

Comment on lines +12 to +21
interface Envelope<T> {
data?: T;
}

function unwrapEnvelope<T>(response: Envelope<T> | T): T {
if (response && typeof response === 'object' && 'data' in response) {
return (response as Envelope<T>).data as T;
}
return response as T;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t treat envelope failures as successful responses.

unwrapEnvelope ignores the backend error field and will return undefined when data is absent. The slice’s fulfilled reducers dereference these results immediately, so an envelope-level RPC failure turns into a later Cannot read properties of undefined instead of a rejected thunk.

🐛 Suggested fix
 interface Envelope<T> {
   data?: T;
+  error?: {
+    code?: string;
+    message: string;
+  } | null;
 }
 
-function unwrapEnvelope<T>(response: Envelope<T> | T): T {
-  if (response && typeof response === 'object' && 'data' in response) {
-    return (response as Envelope<T>).data as T;
+const unwrapEnvelope = <T,>(response: Envelope<T> | T): T => {
+  if (response && typeof response === 'object' && ('data' in response || 'error' in response)) {
+    const envelope = response as Envelope<T>;
+    if (envelope.error) {
+      throw new Error(envelope.error.message);
+    }
+    if (typeof envelope.data === 'undefined') {
+      throw new Error('RPC response did not include data');
+    }
+    return envelope.data;
   }
   return response as T;
-}
+};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
interface Envelope<T> {
data?: T;
}
function unwrapEnvelope<T>(response: Envelope<T> | T): T {
if (response && typeof response === 'object' && 'data' in response) {
return (response as Envelope<T>).data as T;
}
return response as T;
}
interface Envelope<T> {
data?: T;
error?: {
code?: string;
message: string;
} | null;
}
const unwrapEnvelope = <T,>(response: Envelope<T> | T): T => {
if (response && typeof response === 'object' && ('data' in response || 'error' in response)) {
const envelope = response as Envelope<T>;
if (envelope.error) {
throw new Error(envelope.error.message);
}
if (typeof envelope.data === 'undefined') {
throw new Error('RPC response did not include data');
}
return envelope.data;
}
return response as T;
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/src/services/api/threadApi.ts` around lines 12 - 21, The unwrapEnvelope
function currently treats an envelope with a missing data field as a successful
response; update unwrapEnvelope<T> to detect envelope-level failures by checking
for an "error" property on the response object and, if present (or if data is
undefined/null when an envelope was expected), throw an Error (or rethrow the
embedded error message) so the thunk rejects instead of returning undefined;
preserve behavior for raw non-envelope responses by returning the value when the
object is not an envelope. Ensure you update the Envelope<T> handling in
unwrapEnvelope and any places calling unwrapEnvelope to expect a thrown error
for envelope failures.

Comment on lines +186 to +193
/// Response payload for purging all workspace-backed conversations.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PurgeConversationThreadsResponse {
pub messages_deleted: usize,
pub agent_threads_deleted: usize,
pub agent_messages_deleted: usize,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how PurgeConversationThreadsResponse is constructed from ConversationPurgeStats
rg -n -A 10 'PurgeConversationThreadsResponse' --type rust

Repository: tinyhumansai/openhuman

Length of output: 2807


🏁 Script executed:

rg -n 'struct ConversationPurgeStats' --type rust -A 5

Repository: tinyhumansai/openhuman

Length of output: 469


Fix agent_messages_deleted mapping in PurgeConversationThreadsResponse.

agent_messages_deleted is incorrectly assigned the same value as messages_deleted (both use stats.message_count). The field names suggest they should represent distinct counts—either provide a separate stat for agent messages or remove the field if unneeded.

In ops.rs lines 1126-1130, change the mapping from:

agent_messages_deleted: stats.message_count,

to the correct value, or remove this field entirely if not required.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/rpc_models.rs` around lines 186 - 193, The
PurgeConversationThreadsResponse struct's agent_messages_deleted field is being
incorrectly populated with stats.message_count (same as messages_deleted);
locate the code that constructs PurgeConversationThreadsResponse (the mapping
where agent_messages_deleted: stats.message_count is set) and either replace
stats.message_count with the correct agent-specific counter from the stats
object (e.g., stats.agent_message_count or similar) or remove the
agent_messages_deleted field from the response and struct if no distinct agent
count exists; update the PurgeConversationThreadsResponse definition and all
construction sites (referencing the struct name PurgeConversationThreadsResponse
and the agent_messages_deleted identifier) to keep the types and serialization
consistent.

…channel events. Update domain subscriber registration to include workspace directory, ensuring proper message handling and persistence across channels. Refactor event structure to include message ID and reply target for improved tracking. Additionally, adjust module visibility for context management.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
src/openhuman/memory/conversations/bus.rs (2)

148-150: Minor: Prefer &Path over &PathBuf for function parameters.

Using &Path is more idiomatic as it accepts both &Path and &PathBuf without requiring the caller to have a PathBuf.

📝 Suggested change
 fn persist_channel_turn(
-    workspace_dir: &PathBuf,
+    workspace_dir: &Path,
     descriptor: ChannelTurnDescriptor<'_>,
 ) -> Result<(), String> {

This would require adding use std::path::Path; to the imports.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/conversations/bus.rs` around lines 148 - 150, Change the
persist_channel_turn signature to accept workspace_dir: &Path instead of
&PathBuf and add use std::path::Path to imports; update any call sites of
persist_channel_turn (they can continue passing &PathBuf as &Path will coerce)
to match the new signature and adjust any internal code that relied on
PathBuf-specific methods to either dereference or call .to_path_buf() where
necessary.

175-186: Consider a more efficient duplicate check for high-volume scenarios.

The current implementation reads all messages from the thread file on every persist call to check for duplicates. This is O(n) per message and could become slow for very long-running conversations.

For typical channel message rates this is acceptable, but if you anticipate high-volume use, consider caching recently-persisted message IDs or using an index file.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/memory/conversations/bus.rs` around lines 175 - 186, The
duplicate check currently calls get_messages(workspace_dir.clone(), &thread_id)
and scans every message to compare message.id against persisted_message_id
(built from descriptor.role and descriptor.message_id), which is O(n) per
persist; replace this with a more efficient approach by introducing a per-thread
index of persisted IDs (e.g., an on-disk index file or an in-memory HashSet
cached in a ThreadsIndex or ConversationStore) and update that index when you
append a new message so subsequent calls to the persist routine can check
membership in O(1) instead of iterating all messages; modify the code paths
around the persist function that build persisted_message_id and the call sites
that use get_messages to consult the new index (and fall back to scanning on
cache-miss or index corruption) and ensure thread-safe access when updating the
index.
src/core/jsonrpc.rs (1)

837-838: Consider updating the log message to include conversations.

The log message lists registered subscribers but doesn't mention the newly added conversation persistence subscriber.

📝 Suggested log message update
         log::info!(
-            "[event_bus] webhook, channel, health, skill, and restart subscribers registered"
+            "[event_bus] webhook, channel, health, skill, conversation, and restart subscribers registered"
         );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/core/jsonrpc.rs` around lines 837 - 838, Update the log::info call that
currently prints "[event_bus] webhook, channel, health, skill, and restart
subscribers registered" to also mention the conversation persistence subscriber
(e.g., include "conversation(s)" or "conversation persistence" in the message);
locate the log::info invocation in jsonrpc.rs that emits the event_bus
subscriber registration message and modify the string to list conversations
alongside webhook, channel, health, skill, and restart so the log accurately
reflects the newly added subscriber.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/core/jsonrpc.rs`:
- Around line 837-838: Update the log::info call that currently prints
"[event_bus] webhook, channel, health, skill, and restart subscribers
registered" to also mention the conversation persistence subscriber (e.g.,
include "conversation(s)" or "conversation persistence" in the message); locate
the log::info invocation in jsonrpc.rs that emits the event_bus subscriber
registration message and modify the string to list conversations alongside
webhook, channel, health, skill, and restart so the log accurately reflects the
newly added subscriber.

In `@src/openhuman/memory/conversations/bus.rs`:
- Around line 148-150: Change the persist_channel_turn signature to accept
workspace_dir: &Path instead of &PathBuf and add use std::path::Path to imports;
update any call sites of persist_channel_turn (they can continue passing
&PathBuf as &Path will coerce) to match the new signature and adjust any
internal code that relied on PathBuf-specific methods to either dereference or
call .to_path_buf() where necessary.
- Around line 175-186: The duplicate check currently calls
get_messages(workspace_dir.clone(), &thread_id) and scans every message to
compare message.id against persisted_message_id (built from descriptor.role and
descriptor.message_id), which is O(n) per persist; replace this with a more
efficient approach by introducing a per-thread index of persisted IDs (e.g., an
on-disk index file or an in-memory HashSet cached in a ThreadsIndex or
ConversationStore) and update that index when you append a new message so
subsequent calls to the persist routine can check membership in O(1) instead of
iterating all messages; modify the code paths around the persist function that
build persisted_message_id and the call sites that use get_messages to consult
the new index (and fall back to scanning on cache-miss or index corruption) and
ensure thread-safe access when updating the index.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 21b2c7ad-b860-47b3-a2a3-2612bc45c27b

📥 Commits

Reviewing files that changed from the base of the PR and between f32cb92 and 8a5cfb1.

📒 Files selected for processing (7)
  • src/core/jsonrpc.rs
  • src/openhuman/channels/mod.rs
  • src/openhuman/channels/runtime/dispatch.rs
  • src/openhuman/channels/runtime/startup.rs
  • src/openhuman/event_bus/events.rs
  • src/openhuman/memory/conversations/bus.rs
  • src/openhuman/memory/conversations/mod.rs
✅ Files skipped from review due to trivial changes (2)
  • src/openhuman/channels/mod.rs
  • src/openhuman/memory/conversations/mod.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant