Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 84 additions & 38 deletions api/routers/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ async def close_stream(stream):
sampled=False,
)
if full_content.strip():
yield emit("chunk", {"chunk": "\n\n[Connection interrupted. Partial technical response delivered.]"})
mode_label = "technical " if mode == TECHNICAL_MODE else ""
yield emit("chunk", {"chunk": f"\n\n[Connection interrupted. Partial {mode_label}response delivered.]"})
yield emit("done", "[DONE]")
if full_content.strip():
await cache_set(_cache_key(topic, level, mode), {"text": full_content})
Expand Down Expand Up @@ -723,56 +724,101 @@ async def close_stream(stream):
)


async def save_to_history(user, topic: str, levels: list[str], mode: str):
"""Background task to save query to history."""
async def save_to_history(user, topic: str, levels: list[str], mode: str) -> None:
"""
Persist a query to the user's history.

Failures are logged as errors with full context but do not propagate —
history loss is preferable to crashing the response task.
Typically called via _persist_history_safely() for bounded execution.
"""
user_id_hash = anonymize_user_id(str(getattr(user, "id", "") or ""))
topic_hash = anonymize_text(topic)

try:
await ensure_user_exists(user)
supabase = get_supabase_admin()
if not supabase:
logger.error("save_to_history_task_no_supabase_admin")
return

def _fetch_existing():
return (
supabase.table("history")
.select("id, levels")
.eq("user_id", user.id)
.eq("topic", topic)
.execute()
)
except Exception as exc:
logger.error(
"save_to_history_ensure_user_failed",
error=str(exc),
error_type=type(exc).__name__,
user_id_hash=user_id_hash,
sampled=False,
)
return # cannot proceed without a valid user row

existing = await asyncio.to_thread(_fetch_existing)
supabase = get_supabase_admin()
if not supabase:
logger.error(
"save_to_history_no_supabase_admin",
user_id_hash=user_id_hash,
sampled=False,
)
return

normalized_mode = normalize_mode(mode)
normalized_mode = normalize_mode(mode)

try:
existing = await asyncio.to_thread(
lambda: supabase.table("history")
.select("id, levels")
.eq("user_id", user.id)
.eq("topic", topic)
.execute()
)
except Exception as exc:
logger.error(
"save_to_history_fetch_failed",
error=str(exc),
error_type=type(exc).__name__,
user_id_hash=user_id_hash,
topic_hash=topic_hash,
sampled=False,
)
return

try:
data = getattr(existing, "data", None)
if isinstance(data, list) and data and isinstance(data[0], dict):
item_id = data[0].get("id")
existing_levels = set(data[0].get("levels") or [])
new_levels = list(existing_levels.union(set(levels)))
def _update_existing():
return (
supabase.table("history")
.update({"levels": new_levels, "mode": normalized_mode})
.eq("id", item_id)
.execute()
)

await asyncio.to_thread(_update_existing)
await asyncio.to_thread(
lambda: supabase.table("history")
.update({"levels": new_levels, "mode": normalized_mode})
.eq("id", item_id)
.execute()
)
logger.debug(
"save_to_history_updated",
user_id_hash=user_id_hash,
topic_hash=topic_hash,
mode=normalized_mode,
)
else:
def _insert_new():
return (
supabase.table("history")
.insert({"user_id": user.id, "topic": topic, "levels": levels, "mode": normalized_mode})
.execute()
)

await asyncio.to_thread(_insert_new)
await asyncio.to_thread(
lambda: supabase.table("history")
.insert({
"user_id": user.id,
"topic": topic,
"levels": levels,
"mode": normalized_mode,
})
.execute()
)
logger.debug(
"save_to_history_inserted",
user_id_hash=user_id_hash,
topic_hash=topic_hash,
mode=normalized_mode,
)
except Exception as exc:
logger.error(
"save_to_history_task_error",
"save_to_history_write_failed",
error=str(exc),
user_id_hash=anonymize_user_id(str(getattr(user, "id", "") or "") or None),
topic_hash=anonymize_text(topic),
error_type=type(exc).__name__,
user_id_hash=user_id_hash,
topic_hash=topic_hash,
mode=normalized_mode,
sampled=False,
)
55 changes: 55 additions & 0 deletions api/tests/test_query_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,61 @@ async def fake_auth():
assert elapsed >= 0.05


@pytest.mark.asyncio
async def test_save_to_history_logs_error_and_returns_when_supabase_unavailable(
monkeypatch,
):
"""save_to_history must not raise — it logs and returns on any failure."""
user = SimpleNamespace(id="user-hist", email="h@example.com", user_metadata={})
errors_logged = []

async def fake_ensure_user_exists(_user):
pass # succeed

def fake_get_supabase_admin():
return None # simulate unavailable

def fake_log_error(event, **kwargs):
errors_logged.append(event)

monkeypatch.setattr(query_module, "ensure_user_exists", fake_ensure_user_exists)
monkeypatch.setattr(query_module, "get_supabase_admin", fake_get_supabase_admin)
monkeypatch.setattr(query_module.logger, "error", fake_log_error)

# Must not raise
await query_module.save_to_history(user, "topic", ["eli5"], "learning")

assert any("no_supabase_admin" in e for e in errors_logged)


@pytest.mark.asyncio
async def test_save_to_history_logs_error_on_fetch_failure(monkeypatch):
"""save_to_history must log fetch errors and not propagate them."""
user = SimpleNamespace(id="user-hist2", email="h2@example.com", user_metadata={})
errors_logged = []

async def fake_ensure_user_exists(_user):
pass

class BrokenSupabase:
def table(self, _name):
raise RuntimeError("connection refused")

def fake_get_supabase_admin():
return BrokenSupabase()

def fake_log_error(event, **kwargs):
errors_logged.append(event)

monkeypatch.setattr(query_module, "ensure_user_exists", fake_ensure_user_exists)
monkeypatch.setattr(query_module, "get_supabase_admin", fake_get_supabase_admin)
monkeypatch.setattr(query_module.logger, "error", fake_log_error)

await query_module.save_to_history(user, "topic", ["eli5"], "learning")

assert any("fetch_failed" in e for e in errors_logged)


@pytest.mark.asyncio
async def test_query_invalid_topic(app_client):
resp = await app_client.post(
Expand Down
89 changes: 41 additions & 48 deletions src/lib/chatStoreUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ export const notifyError = (message: string) => {
}
};

export const resolveMessageKey = (message: Message) => {
/**
* Returns the canonical store key for a message.
* Preference order: clientGeneratedId > assistant_client_id > client_id > serverMessageId > id
* This key is used as the dict key in messagesById and as entries in messageIds.
*/
export const resolveMessageKey = (message: Message): string => {
return (
message.clientGeneratedId ||
message.metadata?.assistant_client_id ||
Expand All @@ -191,69 +196,57 @@ export const resolveMessageKey = (message: Message) => {
);
};

export const messagesMatch = (existing: Message, incoming: Message) => {
if (existing.id === incoming.id) return true;
if (
existing.clientGeneratedId &&
incoming.clientGeneratedId &&
existing.clientGeneratedId === incoming.clientGeneratedId
) {
/**
* Returns true if two message objects refer to the same logical message.
*
* Resolution order:
* 1. Canonical clientId match (set at message creation, most reliable)
* 2. Server-assigned id match (for messages loaded from the database)
* 3. Legacy fallback for messages created before this change
*
* Do not add new branches. If a new identity field is introduced,
* add it to the canonical clientId derivation in resolveMessageKey()
* instead.
*/
export const messagesMatch = (existing: Message, incoming: Message): boolean => {
// 1. Canonical client ID — most reliable, set at creation time
const existingClientId =
existing.clientGeneratedId ||
existing.metadata?.assistant_client_id ||
existing.metadata?.client_id;

const incomingClientId =
incoming.clientGeneratedId ||
incoming.metadata?.assistant_client_id ||
incoming.metadata?.client_id;

if (existingClientId && incomingClientId && existingClientId === incomingClientId) {
return true;
}
if (
incoming.metadata?.assistant_client_id &&
existing.clientGeneratedId === incoming.metadata.assistant_client_id
) {
return true;
}
if (
existing.metadata?.assistant_client_id &&
incoming.clientGeneratedId &&
existing.metadata.assistant_client_id === incoming.clientGeneratedId
) {

// 2. Server-assigned UUID — for messages loaded from Supabase that never
// had a client ID (e.g. messages from a previous session)
if (existing.id && incoming.id && existing.id === incoming.id) {
return true;
}

// 3. Cross-reference: server id on one side, serverMessageId on the other
// Handles the transition window when a local message gets its server id
if (
existing.serverMessageId &&
incoming.id &&
existing.serverMessageId === incoming.id
)
) {
return true;
}
if (
incoming.serverMessageId &&
existing.id &&
incoming.serverMessageId === existing.id
)
return true;
if (
incoming.metadata?.client_id &&
existing.id === incoming.metadata.client_id
)
return true;
if (
existing.metadata?.client_id &&
existing.metadata.client_id === incoming.id
)
return true;
if (
incoming.metadata?.client_id &&
existing.clientGeneratedId === incoming.metadata.client_id
)
return true;
if (
existing.metadata?.client_id &&
incoming.clientGeneratedId &&
existing.metadata.client_id === incoming.clientGeneratedId
) {
return true;
}
if (
incoming.metadata?.client_id &&
existing.metadata?.client_id &&
existing.metadata.client_id === incoming.metadata.client_id
) {
return true;
}

return false;
};

Expand Down
Loading
Loading