Skip to content

feat(observability): add response feedback tracking to openapi#1660

Merged
yeshion23333 merged 12 commits intomainfrom
bot-feedback-observability
Apr 30, 2026
Merged

feat(observability): add response feedback tracking to openapi#1660
yeshion23333 merged 12 commits intomainfrom
bot-feedback-observability

Conversation

@myysy
Copy link
Copy Markdown
Collaborator

@myysy myysy commented Apr 23, 2026

Summary

This PR adds an end-to-end feedback observability flow for Vikingbot, including stable response_id tracking, explicit feedback submission, and implicit response outcome evaluation.
It connects assistant responses, feedback events, follow-up user turns, and Langfuse metadata so feedback can be attributed at the response level instead of only at the session level.

Type of Change

  • New feature (feat)
  • Bug fix (fix)
  • Documentation (docs)
  • Refactoring (refactor)
  • Other

Testing

Describe how to test these changes:

  • Unit tests pass
  • Manual testing completed
    Testing details:
  • Added/updated tests for:
    • response outcome evaluation
    • Langfuse outcome metadata updates
    • OpenAPI response ID propagation
    • explicit feedback persistence and emitted events
    • bot proxy feedback forwarding
    • gateway token enforcement
    • bot gateway bootstrap behavior
  • Verified merge conflict resolution in bot/vikingbot/agent/loop.py keeps both:
    • branch-side final_reasoning_content persistence
    • main-side response_id, RESPONSE_COMPLETED, and outcome evaluation flow
  • Ran Python syntax validation for touched bot observability modules with python -m py_compile ...
    Notes:
  • Local pytest execution was blocked by environment issues:
    • missing _sqlite3 for pytest-cov
    • local import environment missing openviking package resolution
  • The new/updated tests are included in the branch and intended to run in CI (pr.yml)

Related Issues

  • Related to feedback observability phase work
  • Related to Langfuse response-level attribution and outcome tracking

Checklist

  • Code follows project style guidelines
  • Tests added for new functionality
  • Documentation updated (if needed)
  • All tests pass

Description

Changes Made

  • add stable response_id generation and propagation for assistant responses
  • return response_id in OpenAPI chat responses and streaming response events
  • add /bot/v1/feedback support in Vikingbot OpenAPI channel
  • add /bot/v1/feedback proxy support in openviking/server/routers/bot.py
  • persist feedback events in session metadata
  • emit analytics events for:
    • RESPONSE_COMPLETED
    • FEEDBACK_SUBMITTED
    • RESPONSE_OUTCOME_EVALUATED
  • add response outcome evaluation helpers for:
    • resolved
    • reasked
    • follow_up
    • abandoned
    • positive_feedback
    • negative_feedback
  • update Langfuse integration so outcomes can be attached back to the originating generation
  • preserve provider-specific reasoning history via reasoning_content
  • add design doc and validation docs for phases 1-3
  • add/extend regression tests around auth, proxying, and outcome behavior

Key Files

  • bot/vikingbot/agent/loop.py
  • bot/vikingbot/channels/openapi.py
  • bot/vikingbot/channels/openapi_models.py
  • bot/vikingbot/observability/outcome.py
  • bot/vikingbot/integrations/langfuse.py
  • bot/vikingbot/utils/tracing.py
  • bot/vikingbot/providers/litellm_provider.py
  • bot/vikingbot/providers/openai_compatible_provider.py
  • openviking/server/routers/bot.py
  • openviking/server/bootstrap.py

Merge Compatibility Notes

This branch has been merged with the latest main.
The only merge conflict was in bot/vikingbot/agent/loop.py, and it was resolved by keeping both sides’ intended behavior:

  • keep this branch’s final_reasoning_content handling and persistence
  • keep main’s response-level tracing and outcome evaluation path based on response_id
    This avoids regressions in:
  • feedback attribution design
  • Langfuse generation linking
  • outcome evaluation
  • mainline response lifecycle events
  • provider-specific reasoning history

Additional Notes

Relevant tests added/updated:

  • bot/tests/test_agent_loop_outcome.py
  • bot/tests/test_langfuse_outcome_metadata.py
  • bot/tests/test_outcome_evaluator.py
  • bot/tests/test_openapi_auth.py
  • tests/server/test_bot_proxy_auth.py
  • tests/unit/test_server_bootstrap_bot_gateway.py
    Relevant docs added/updated:
  • bot/docs/vikingbot-feedback-observability-design.md
  • bot/docs/vikingbot-langfuse-local-setup.md
  • bot/docs/vikingbot-phase1-validation-with-openviking-server.md
  • bot/docs/vikingbot-phase2-feedback-validation-with-openviking-server.md
  • bot/docs/vikingbot-phase3-outcome-validation-with-openviking-server.md

Persist response, feedback, and outcome signals so OpenAPI clients can attach stable feedback to assistant replies. Sync the same business events to Langfuse to make Q&A effectiveness observable across restarts.
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🏅 Score: 85
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Performance Concern

_find_latest_response_for_session iterates over all stored responses, leading to O(n) time complexity per follow-up message. This could degrade performance with a large number of responses.

def _find_latest_response_for_session(
    self, session_id: str, channel: str | None = None
) -> Dict[str, Any] | None:
    latest: Dict[str, Any] | None = None
    latest_ts: datetime | None = None
    for response_info in self._response_index.values():
        if response_info.get("session_id") != session_id:
            continue
        if channel and response_info.get("channel") != channel:
            continue
        ts = self._parse_timestamp(response_info.get("timestamp"))
        if ts is None:
            continue
        if latest is None or latest_ts is None or ts > latest_ts:
            latest = response_info
            latest_ts = ts
    return latest
Minor Validation Gap

The feedback endpoint accepts submissions for non-existent response IDs without any validation or warning. This may lead to orphaned feedback records.

async def _handle_feedback(self, request: FeedbackRequest) -> FeedbackResponse:
    """Handle explicit user feedback for a response."""
    feedback_id = str(uuid.uuid4())
    now = datetime.now()
    response_info = self._response_index.get(request.response_id, {})
    record = {
        "event_type": "feedback_submitted",
        "feedback_id": feedback_id,
        "response_id": request.response_id,
        "rating": request.rating.value,
        "comment": request.comment,
        "session_id": request.session_id or response_info.get("session_id"),
        "session_key": response_info.get("session_key"),
        "channel": response_info.get("channel"),
        "user_id": request.user_id or response_info.get("user_id"),
        "timestamp": now.isoformat(),
    }
    await self._store_feedback(record)
    return FeedbackResponse(
        feedback_id=feedback_id,
        response_id=request.response_id,
        accepted=True,
        timestamp=now,
    )

@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Use asyncio.to_thread() for blocking file I/O

Run blocking file I/O operations in a thread pool using asyncio.to_thread() to avoid
blocking the async event loop. This improves responsiveness under high load.

bot/vikingbot/channels/openapi.py [266-314]

 async def _store_response(self, msg: OutboundMessage) -> None:
-    ...
-    async with self._response_lock:
+    if not msg.response_id or not msg.response_completed:
+        return
+
+    record = {
+        "event_type": "response_completed",
+        "response_id": msg.response_completed.response_id,
+        "session_id": msg.session_key.chat_id,
+        "session_key": msg.response_completed.session_id,
+        "channel": msg.response_completed.channel,
+        "user_id": msg.response_completed.user_id,
+        "token_usage": msg.response_completed.token_usage,
+        "time_cost": msg.response_completed.time_cost,
+        "iteration": msg.response_completed.iteration,
+        "tools_used_names": msg.response_completed.tools_used_names,
+        "timestamp": msg.response_completed.timestamp.isoformat(),
+    }
+    
+    def _write():
         with open(self._responses_file, "a") as f:
             f.write(json.dumps(record, ensure_ascii=False) + "\n")
-    ...
+    
+    async with self._response_lock:
+        await asyncio.to_thread(_write)
+    self._sync_langfuse_response(record)
 
 async def _store_feedback(self, record: Dict[str, Any]) -> None:
-    ...
-    async with self._feedback_lock:
+    def _write():
         with open(self._feedback_file, "a") as f:
             f.write(json.dumps(record, ensure_ascii=False) + "\n")
-    ...
+    
+    async with self._feedback_lock:
+        await asyncio.to_thread(_write)
+    self._sync_langfuse_feedback(record)
 
 async def _store_outcome(self, record: Dict[str, Any]) -> None:
-    ...
-    async with self._outcome_lock:
+    def _write():
         with open(self._outcomes_file, "a") as f:
             f.write(json.dumps(record, ensure_ascii=False) + "\n")
-    ...
+    
+    async with self._outcome_lock:
+        await asyncio.to_thread(_write)
+    self._sync_langfuse_outcome(record)
Suggestion importance[1-10]: 6

__

Why: This addresses potential event loop blocking from synchronous file I/O in async functions, improving responsiveness under load. It's a good practice change with moderate impact.

Low
Optimize latest response lookup with a session index

Maintain a session-to-latest-response index to avoid O(n) scans when finding the
latest response for a session. This reduces lookup time from linear to constant.

bot/vikingbot/channels/openapi.py [219-235]

+def __init__(...):
+    ...
+    self._response_index: Dict[str, Dict[str, Any]] = {}
+    self._session_latest_response: Dict[str, str] = {}  # session_id -> response_id
+    ...
+
+def _index_response(self, msg: OutboundMessage) -> None:
+    if not msg.response_id or not msg.response_completed:
+        return
+
+    self._response_index[msg.response_id] = {
+        "response_id": msg.response_id,
+        "session_id": msg.session_key.chat_id,
+        "session_key": msg.response_completed.session_id,
+        "channel": msg.session_key.channel_key(),
+        "user_id": msg.response_completed.user_id if msg.response_completed else None,
+        "event_type": msg.event_type.value,
+        "timestamp": (
+            msg.response_completed.timestamp.isoformat()
+            if msg.response_completed
+            else datetime.now().isoformat()
+        ),
+    }
+    # Update session latest response index
+    session_id = msg.session_key.chat_id
+    current_latest_id = self._session_latest_response.get(session_id)
+    if not current_latest_id:
+        self._session_latest_response[session_id] = msg.response_id
+    else:
+        current_latest = self._response_index.get(current_latest_id)
+        if current_latest:
+            current_ts = self._parse_timestamp(current_latest.get("timestamp"))
+            new_ts = self._parse_timestamp(self._response_index[msg.response_id]["timestamp"])
+            if current_ts is None or (new_ts is not None and new_ts > current_ts):
+                self._session_latest_response[session_id] = msg.response_id
+
+def _load_response_index(self) -> None:
+    if not self._responses_file.exists():
+        return
+
+    try:
+        with open(self._responses_file) as f:
+            for line in f:
+                line = line.strip()
+                if not line:
+                    continue
+                data = json.loads(line)
+                response_id = data.get("response_id")
+                if not response_id:
+                    continue
+                self._response_index[response_id] = data
+                # Update session latest response index
+                session_id = data.get("session_id")
+                if session_id:
+                    current_latest_id = self._session_latest_response.get(session_id)
+                    if not current_latest_id:
+                        self._session_latest_response[session_id] = response_id
+                    else:
+                        current_latest = self._response_index.get(current_latest_id)
+                        if current_latest:
+                            current_ts = self._parse_timestamp(current_latest.get("timestamp"))
+                            new_ts = self._parse_timestamp(data.get("timestamp"))
+                            if current_ts is None or (new_ts is not None and new_ts > current_ts):
+                                self._session_latest_response[session_id] = response_id
+    except Exception as e:
+        logger.warning(f"Failed to load persisted response index: {e}")
+
 def _find_latest_response_for_session(
     self, session_id: str, channel: str | None = None
 ) -> Dict[str, Any] | None:
-    latest: Dict[str, Any] | None = None
-    latest_ts: datetime | None = None
-    for response_info in self._response_index.values():
-        if response_info.get("session_id") != session_id:
-            continue
-        if channel and response_info.get("channel") != channel:
-            continue
-        ts = self._parse_timestamp(response_info.get("timestamp"))
-        if ts is None:
-            continue
-        if latest is None or latest_ts is None or ts > latest_ts:
-            latest = response_info
-            latest_ts = ts
-    return latest
+    latest_response_id = self._session_latest_response.get(session_id)
+    if not latest_response_id:
+        return None
+    response_info = self._response_index.get(latest_response_id)
+    if not response_info:
+        return None
+    if channel and response_info.get("channel") != channel:
+        return None
+    return response_info
Suggestion importance[1-10]: 5

__

Why: This is a valid performance optimization that reduces lookup time from O(n) to O(1), improving efficiency especially with many responses. It's a moderate improvement, not critical for functionality.

Low

myysy added 9 commits April 23, 2026 16:00
This commit removes the response tracking, feedback submission, and Langfuse integration features. The changes include:
- Removing ResponseCompletedEvent and related response_id fields
- Removing feedback models and endpoints
- Removing Langfuse client integration
- Simplifying the OpenAPI channel by removing response persistence
- Updating tests to reflect these removals
# Conflicts:
#	bot/vikingbot/agent/loop.py
- remove unnecessary blank lines
- reorder imports to follow conventions
- fix string formatting in logging
- group related imports together
- fix import ordering in multiple files
@myysy myysy requested a review from yeshion23333 April 30, 2026 07:52
Copy link
Copy Markdown
Collaborator

@yeshion23333 yeshion23333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这次改动方向是对的(response_id + feedback + outcome + Langfuse 关联链路基本完整),但当前还有几个需要先修复的问题,建议先不合入:

  1. 阻塞:CI lint 未通过
  • bot/vikingbot/agent/memory.py:57:F841,变量 abstract 赋值后未使用
  • bot/vikingbot/channels/mochat.py:189:F821,MochatConfig 未定义
  1. 高风险:反馈写回存在并发覆盖风险
  • bot/vikingbot/channels/openapi.py_handle_feedback 中通过独立 SessionManager 读 session、改 metadata、再全量 save
  • bot/vikingbot/session/manager.py:221save 为整文件重写
  • AgentLoop 并发写 session 时,存在旧快照覆盖新消息/新 metadata 的风险
  1. 口径问题:abandoned 语义与设计文档不一致
  • 当前实现 abandoned = bool(user_messages) and not relevant_feedbackbot/vikingbot/observability/outcome.py
  • 这更像“有后续用户消息但无反馈”,与文档中“回答后用户离开/放弃”的语义不一致
  1. 统计污染风险:heartbeat 可能被当作 follow-up 参与 outcome 评估
  • _evaluate_previous_response_outcome 在入站消息前统一执行(bot/vikingbot/agent/loop.py
  • heartbeat 也是入站消息,可能误触发 reasked/follow_up 判定
  1. 数据质量:反馈请求约束偏弱
  • FeedbackRequestfeedback_score 缺少范围/条件约束
  • user_id 可由请求体覆盖,后续做归因统计时会带来可信性问题

建议至少完成:

  • 修复 lint 红线
  • 处理 session 并发写一致性(避免全量覆盖)
  • 对齐 outcome 口径并排除 heartbeat 干扰
  • 补充并发与口径相关回归测试

修完以上点后,这个 PR 就接近可合并状态了。

myysy added 2 commits April 30, 2026 16:23
Remove unused abstract attribute from MemoryStore and update config parameter type in resolve_require_mention to MochatChannelConfig for better type clarity
…or clarity

refactor(session): add session locking and metadata merging
- Implement per-session file locks to prevent concurrent write conflicts
- Add metadata merging logic to preserve existing data during updates
- Introduce update_session method for atomic read-modify-write operations

fix(agent): skip outcome evaluation for heartbeat messages
@yeshion23333
Copy link
Copy Markdown
Collaborator

感谢这轮修复提交,问题定位和修改都很到位,辛苦了!

@yeshion23333 yeshion23333 merged commit 21854b5 into main Apr 30, 2026
6 checks passed
@yeshion23333 yeshion23333 deleted the bot-feedback-observability branch April 30, 2026 09:12
@github-project-automation github-project-automation Bot moved this from Backlog to Done in OpenViking project Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

2 participants