# Context Engineering for AI Agents - 上下文工程實戰指南

## 📋 本筆記本涵蓋內容

基於Manus博客的上下文工程核心概念，本筆記本將實現以下技術：

1. **KV-Cache優化** - 通過穩定提示前綴實現緩存友好的上下文管理
2. **工具遮罩管理** - 避免破壞緩存的動態工具可用性控制
3. **文件系統外部內存** - 突破上下文窗口限制的策略
4. **注意力操控** - 通過復述維持任務焦點
5. **錯誤保留學習** - 將失敗轉化為學習信號
6. **Kimi K2集成** - 使用月之暗面的Kimi K2模型實現生產級部署

## 🎯 學習目標

理解並掌握生產級AI Agent系統中的上下文工程最佳實踐，重點關注：
- 緩存命中率優化（成本降低10倍）
- 長期任務的注意力維持
- 可擴展的外部內存架構
- 錯誤驅動的自適應學習

## 🔧 技術棧
- **LLM**: Kimi K2 (moonshot-v1-32k)
- **API**: Moonshot AI Platform
- **安全性**: 運行時API密鑰輸入

## 🔐 安全的API密鑰配置

根據上下文工程最佳實踐，我們需要安全地處理API密鑰，避免硬編碼在代碼中。

In [None]:
# 基礎環境設置和依賴導入
import os
import json
import time
import asyncio
import getpass
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import contextlib
import subprocess

# HTTP客戶端
import httpx

# 數據處理
import numpy as np
import pandas as pd

print("✅ 基礎環境設置完成")

## 1️⃣ KV-Cache優化的上下文管理

**核心概念**: KV-Cache命中率是生產AI agents最重要的指標，直接影響延遲和成本
- 緩存成本: $0.30/MTok vs 未緩存: $3.00/MTok (10倍差異)
- 穩定提示前綴至關重要
- Append-only設計保持緩存有效性

In [None]:
# 安全的API密鑰輸入 - 運行時配置，避免硬編碼洩露
def setup_kimi_api():
    """
    安全地設置Kimi K2 API配置
    這是上下文工程中的安全實踐 - 永不在代碼中硬編碼敏感信息
    """
    api_key = getpass.getpass("請輸入您的Kimi API密鑰: ")
    
    # Kimi K2 API 配置
    config = {
        "api_key": api_key,
        "base_url": "https://api.moonshot.cn/v1",
        "model": "moonshot-v1-32k",  # Kimi K2 模型
        "max_tokens": 2048,
        "temperature": 0.3
    }
    
    print("✅ Kimi K2 API配置完成")
    print(f"📊 模型: {config['model']}")
    print(f"🔗 端點: {config['base_url']}")
    
    return config

# 執行配置
# kimi_config = setup_kimi_api()

## 2️⃣ 工具遮罩管理機制

**核心概念**: 通過遮罩token logits而非移除工具定義來管理動態動作空間
- 避免破壞KV-cache的工具移除
- 使用前綴命名約定實現工具組限制
- 支持Auto/Required/Specified三種函數調用模式

In [None]:
@dataclass
class ContextOptimizedAgent:
    """
    KV-Cache優化的AI Agent實現
    
    設計原則:
    1. 穩定的系統提示 (100% 緩存命中)
    2. Append-only 上下文構建
    3. 確定性序列化避免緩存失效
    """
    system_prompt: str = "你是一個專業的AI助手，擅長上下文工程和任務執行。"
    base_context: str = "當前會話上下文: "
    
    def __post_init__(self):
        self.session_history = []
        self.cache_metrics = {
            "total_requests": 0,
            "cache_hits": 0,
            "cache_misses": 0
        }
    
    def build_prompt(self, user_input: str, preserve_cache: bool = True) -> str:
        """
        構建KV-Cache友好的提示
        
        Args:
            user_input: 用戶輸入
            preserve_cache: 是否保持緩存結構
            
        Returns:
            優化的提示字符串
        """
        if preserve_cache:
            # Cache-friendly append-only 構建
            context_parts = [
                self.system_prompt,      # 穩定前綴 - 緩存
                self.base_context,       # 穩定前綴 - 緩存  
                *self.session_history,   # 增量緩存
                f"用戶: {user_input}"      # 新內容
            ]
        else:
            # 非緩存友好方式 (僅用於對比)
            context_parts = [
                f"系統提示 ({len(self.session_history)} 條歷史): {self.system_prompt}",
                *self.session_history,
                f"當前用戶輸入: {user_input}"
            ]
        
        return "\\n".join(context_parts)
    
    def add_interaction(self, user_input: str, assistant_response: str):
        """添加交互到會話歷史 - Append-only 設計"""
        interaction = f"用戶: {user_input}\\n助手: {assistant_response}"
        self.session_history.append(interaction)
        
    def deterministic_serialize(self, data: dict) -> str:
        """
        確定性JSON序列化 - 保證穩定的鍵排序
        防止因鍵順序變化導致的緩存失效
        """
        return json.dumps(data, sort_keys=True, ensure_ascii=False, separators=(',', ':'))
    
    def estimate_cache_efficiency(self) -> float:
        """估算緩存效率"""
        if self.cache_metrics["total_requests"] == 0:
            return 0.0
        return self.cache_metrics["cache_hits"] / self.cache_metrics["total_requests"]

# 創建實例並測試
cache_agent = ContextOptimizedAgent()

print("✅ KV-Cache優化Agent創建完成")
print(f"📊 系統提示長度: {len(cache_agent.system_prompt)} 字符")
print(f"🔄 緩存策略: Append-only + 穩定前綴")

In [None]:
# 先安裝必要的可視化庫 (如果沒有的話)
# !pip install matplotlib seaborn plotly ipywidgets

import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import threading
import time

class ContextEngineeringDashboard:
    """
    上下文工程交互式監控儀表板
    
    提供實時監控和可視化:
    - 系統性能指標
    - 緩存效率追踪
    - 錯誤模式分析
    - 注意力管理統計
    """
    
    def __init__(self, agent=None):
        self.agent = agent
        self.is_monitoring = False
        self.monitoring_thread = None
        
        # 創建儀表板組件
        self.setup_widgets()
        
        print("📊 上下文工程儀表板初始化完成")
        
    def setup_widgets(self):
        """設置儀表板組件"""
        
        # 控制面板
        self.start_button = widgets.Button(
            description='開始監控',
            disabled=False,
            button_style='success',
            tooltip='開始實時監控',
            icon='play'
        )
        
        self.stop_button = widgets.Button(
            description='停止監控', 
            disabled=True,
            button_style='danger',
            tooltip='停止實時監控',
            icon='stop'
        )
        
        self.refresh_button = widgets.Button(
            description='手動刷新',
            disabled=False,
            button_style='info',
            tooltip='手動刷新數據',
            icon='refresh'
        )
        
        # 狀態顯示
        self.status_text = widgets.HTML(
            value="<b>狀態:</b> 等待開始監控...",
            placeholder='系統狀態',
            description='',
        )
        
        # 指標顯示
        self.metrics_output = widgets.Output()
        self.charts_output = widgets.Output()
        
        # 綁定事件
        self.start_button.on_click(self.start_monitoring)
        self.stop_button.on_click(self.stop_monitoring)
        self.refresh_button.on_click(self.refresh_data)
        
    def start_monitoring(self, button):
        """開始監控"""
        self.is_monitoring = True
        self.start_button.disabled = True
        self.stop_button.disabled = False
        self.status_text.value = "<b>狀態:</b> <span style='color: green'>監控中...</span>"
        
        # 啟動監控線程
        self.monitoring_thread = threading.Thread(target=self.monitor_loop)
        self.monitoring_thread.daemon = True
        self.monitoring_thread.start()
        
        print("🚀 開始實時監控")
        
    def stop_monitoring(self, button):
        """停止監控"""
        self.is_monitoring = False
        self.start_button.disabled = False
        self.stop_button.disabled = True
        self.status_text.value = "<b>狀態:</b> <span style='color: red'>已停止</span>"
        
        print("⏹️ 停止監控")
        
    def refresh_data(self, button):
        """手動刷新數據"""
        self.update_dashboard()
        print("🔄 數據已刷新")
        
    def monitor_loop(self):
        """監控循環"""
        while self.is_monitoring:
            self.update_dashboard()
            time.sleep(2)  # 每2秒更新一次
            
    def update_dashboard(self):
        """更新儀表板數據"""
        with self.metrics_output:
            clear_output(wait=True)
            self.display_metrics()
            
        with self.charts_output:
            clear_output(wait=True)
            self.display_charts()
    
    def display_metrics(self):
        """顯示關鍵指標"""
        if not self.agent:
            print("⚠️ 未連接代理，顯示示例數據")
            
            # 示例數據
            metrics = {
                "總請求數": 156,
                "緩存命中數": 134,
                "緩存效率": "85.9%",
                "錯誤數": 3,
                "總Token數": 45230,
                "活躍工具數": 8,
                "注意力復述次數": 12
            }
        else:
            # 從實際代理獲取數據
            metrics = self.agent.get_performance_stats()
            
        # 創建指標卡片
        html_content = """
        <div style="display: flex; flex-wrap: wrap; gap: 10px; margin: 10px 0;">
        """
        
        colors = ['#3498db', '#2ecc71', '#f39c12', '#e74c3c', '#9b59b6', '#1abc9c', '#34495e']
        
        for i, (key, value) in enumerate(metrics.items()):
            if key == "錯誤洞察":
                continue  # 跳過複雜的錯誤洞察數據
                
            color = colors[i % len(colors)]
            html_content += f"""
            <div style="
                background: linear-gradient(135deg, {color}, {color}dd);
                color: white;
                padding: 15px;
                border-radius: 8px;
                min-width: 150px;
                text-align: center;
                box-shadow: 0 2px 4px rgba(0,0,0,0.1);
            ">
                <div style="font-size: 24px; font-weight: bold;">{value}</div>
                <div style="font-size: 12px; opacity: 0.9;">{key}</div>
            </div>
            """
            
        html_content += "</div>"
        
        display(HTML(html_content))
        
        # 顯示當前時間
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        display(HTML(f"<p style='text-align: center; color: #666; font-size: 12px;'>最後更新: {current_time}</p>"))
    
    def display_charts(self):
        """顯示圖表"""
        # 創建示例數據用於圖表
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('緩存效率趨勢', '工具使用分佈', '錯誤類型統計', '注意力復述頻率'),
            specs=[[{"secondary_y": False}, {"type": "pie"}],
                   [{"type": "bar"}, {"type": "scatter"}]]
        )
        
        # 1. 緩存效率趨勢
        time_points = list(range(1, 11))
        cache_efficiency = [78, 82, 85, 87, 89, 88, 91, 86, 88, 90]
        
        fig.add_trace(
            go.Scatter(x=time_points, y=cache_efficiency, 
                      name='緩存效率%', line=dict(color='#2ecc71', width=3)),
            row=1, col=1
        )
        
        # 2. 工具使用分佈
        tools = ['file_read', 'browser_navigate', 'shell_execute', 'db_query']
        usage = [35, 28, 20, 17]
        
        fig.add_trace(
            go.Pie(labels=tools, values=usage, name="工具使用"),
            row=1, col=2
        )
        
        # 3. 錯誤類型統計
        error_types = ['invalid_arguments', 'permission_denied', 'timeout', 'connection_error']
        error_counts = [8, 5, 3, 2]
        
        fig.add_trace(
            go.Bar(x=error_types, y=error_counts, name='錯誤數量',
                   marker_color=['#e74c3c', '#f39c12', '#9b59b6', '#34495e']),
            row=2, col=1
        )
        
        # 4. 注意力復述頻率
        recitation_times = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
        recitation_count = [1, 1, 2, 1, 3, 2, 1, 2, 1, 2]
        
        fig.add_trace(
            go.Scatter(x=recitation_times, y=recitation_count,
                      mode='markers+lines', name='復述次數',
                      marker=dict(size=8, color='#f39c12')),
            row=2, col=2
        )
        
        # 更新佈局
        fig.update_layout(
            height=600,
            showlegend=False,
            title_text="上下文工程系統監控儀表板",
            title_x=0.5
        )
        
        # 更新軸標籤
        fig.update_xaxes(title_text="時間點", row=1, col=1)
        fig.update_yaxes(title_text="效率%", row=1, col=1)
        
        fig.update_xaxes(title_text="錯誤類型", row=2, col=1)
        fig.update_yaxes(title_text="次數", row=2, col=1)
        
        fig.update_xaxes(title_text="步驟", row=2, col=2)
        fig.update_yaxes(title_text="復述次數", row=2, col=2)
        
        fig.show()
    
    def create_dashboard_layout(self):
        """創建儀表板佈局"""
        control_panel = widgets.HBox([
            self.start_button,
            self.stop_button, 
            self.refresh_button
        ])
        
        dashboard_layout = widgets.VBox([
            widgets.HTML("<h2>🎛️ 上下文工程監控儀表板</h2>"),
            control_panel,
            self.status_text,
            widgets.HTML("<h3>📊 關鍵指標</h3>"),
            self.metrics_output,
            widgets.HTML("<h3>📈 可視化圖表</h3>"),
            self.charts_output
        ])
        
        return dashboard_layout
    
    def display(self):
        """顯示儀表板"""
        layout = self.create_dashboard_layout()
        display(layout)
        
        # 初始化顯示
        self.update_dashboard()

# 創建並顯示儀表板
print("🎛️ 創建上下文工程監控儀表板...")

# 如果有agent實例，可以傳入以獲取真實數據
# dashboard = ContextEngineeringDashboard(agent=kimi_agent)  # 如果有實際代理
dashboard = ContextEngineeringDashboard()  # 使用示例數據

print("✅ 儀表板創建完成!")
print("💡 使用說明:")
print("1. 點擊'開始監控'按鈕啟動實時監控")
print("2. 點擊'停止監控'按鈕停止監控") 
print("3. 點擊'手動刷新'按鈕手動更新數據")
print("4. 監控期間儀表板會每2秒自動更新")

# 顯示儀表板
dashboard.display()

## 3️⃣ 文件系統作為無限上下文

**核心概念**: 現代LLMs的128K+上下文窗口對真實agentic場景仍不足
- 文件系統提供無限大小、持久性的上下文存儲
- 壓縮策略設計為可恢復的 
- 類Neural Turing Machine架構，外化長期狀態

In [None]:
class MaskedToolAgent:
    """
    工具遮罩管理的AI Agent
    
    核心原理:
    - 保持所有工具定義在上下文中 (保護KV-cache)
    - 通過狀態機控制工具可用性
    - 使用前綴命名約定簡化管理
    """
    
    def __init__(self):
        # 定義所有可用工具 - 穩定定義，不會動態修改
        self.all_tools = {
            "browser_navigate": "導航到指定URL",
            "browser_click": "點擊頁面元素", 
            "browser_extract": "提取頁面內容",
            "file_read": "讀取文件內容",
            "file_write": "寫入文件內容",
            "file_search": "搜索文件",
            "shell_execute": "執行Shell命令",
            "shell_monitor": "監控進程狀態",
            "db_query": "查詢數據庫",
            "db_update": "更新數據記錄"
        }
        
        # 工具組定義 - 基於前綴的分組
        self.tool_groups = {
            "browser": ["browser_navigate", "browser_click", "browser_extract"],
            "file": ["file_read", "file_write", "file_search"],
            "shell": ["shell_execute", "shell_monitor"], 
            "database": ["db_query", "db_update"]
        }
        
        # 當前狀態和可用工具組
        self.current_state = "idle"
        self.active_tool_groups = []
        
    def set_state(self, state: str, allowed_groups: List[str]):
        """
        設置當前狀態和允許的工具組
        這不會修改工具定義，只影響可用性判斷
        """
        self.current_state = state
        self.active_tool_groups = allowed_groups
        
        print(f"🔄 狀態切換: {state}")
        print(f"🛠️ 可用工具組: {', '.join(allowed_groups)}")
    
    def get_available_tools(self) -> List[str]:
        """
        獲取當前狀態下可用的工具
        基於工具組前綴過濾，而非移除定義
        """
        available = []
        for group in self.active_tool_groups:
            if group in self.tool_groups:
                available.extend(self.tool_groups[group])
        return available
    
    def build_tool_prompt(self) -> str:
        """
        構建工具提示 - 包含所有工具定義但標記可用性
        這保持了KV-cache的穩定性
        """
        available_tools = self.get_available_tools()
        
        prompt_parts = ["# 工具定義 (KV-Cache穩定結構)\\n"]
        
        for tool_name, description in self.all_tools.items():
            # 標記工具狀態而不移除定義
            status = "✅ 可用" if tool_name in available_tools else "⭕ 已遮罩"
            prompt_parts.append(f"- {tool_name}: {description} [{status}]")
            
        prompt_parts.append("\\n# 當前可調用工具:")
        if available_tools:
            for tool in available_tools:
                prompt_parts.append(f"- {tool}")
        else:
            prompt_parts.append("- 無可用工具")
            
        return "\\n".join(prompt_parts)
    
    def is_tool_available(self, tool_name: str) -> bool:
        """檢查工具是否在當前狀態下可用"""
        return tool_name in self.get_available_tools()
    
    def simulate_logit_masking(self, tool_name: str) -> str:
        """
        模擬logit遮罩過程
        在實際部署中，這會在模型解碼階段執行
        """
        if self.is_tool_available(tool_name):
            return f"✅ {tool_name} - logit權重: 正常"
        else:
            return f"🚫 {tool_name} - logit權重: -inf (遮罩)"

# 創建並測試工具遮罩代理
tool_agent = MaskedToolAgent()

print("✅ 工具遮罩代理創建完成")
print(f"📊 總工具數: {len(tool_agent.all_tools)}")
print(f"🔧 工具組數: {len(tool_agent.tool_groups)}")

# 測試不同狀態下的工具可用性
print("\\n" + "="*50)
print("測試: 瀏覽器任務狀態")
tool_agent.set_state("browsing", ["browser", "file"])
print("\\n可用工具:", tool_agent.get_available_tools())

## 4️⃣ 注意力操控通過復述機制

**核心概念**: Agent應通過不斷復述目標來操控自己的注意力
- 避免"迷失在中間"的問題
- Todo.md文件作為注意力操控機制
- 平均Manus任務需要約50次工具調用，注意力漂移是重要問題

In [None]:
class FileSystemContext:
    """
    文件系統外部內存實現
    
    設計原理:
    - 文件系統作為無限大小的上下文存儲
    - 壓縮策略保證可恢復性
    - 選擇性注意機制決定何時檢索
    """
    
    def __init__(self, workspace_dir: str = "./agent_workspace"):
        self.workspace = workspace_dir
        self.metadata_file = os.path.join(workspace_dir, "_metadata.json")
        
        # 創建工作空間
        os.makedirs(workspace_dir, exist_ok=True)
        
        # 初始化元數據
        self.metadata = self._load_metadata()
        
        print(f"📁 工作空間初始化: {workspace_dir}")
        
    def _load_metadata(self) -> dict:
        """加載或創建元數據"""
        if os.path.exists(self.metadata_file):
            with open(self.metadata_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        return {
            "created_at": datetime.now().isoformat(),
            "context_files": {},
            "compression_index": {},
            "access_patterns": {}
        }
    
    def _save_metadata(self):
        """保存元數據"""
        with open(self.metadata_file, 'w', encoding='utf-8') as f:
            json.dump(self.metadata, f, ensure_ascii=False, indent=2)
    
    def save_context(self, key: str, data: Any, context_type: str = "general") -> str:
        """
        保存大型上下文數據到文件系統
        
        Args:
            key: 上下文標識符
            data: 要保存的數據
            context_type: 上下文類型 (web_page, document, conversation等)
            
        Returns:
            文件路徑
        """
        timestamp = datetime.now().isoformat()
        filename = f"{key}_{context_type}_{timestamp.replace(':', '-')}.json"
        filepath = os.path.join(self.workspace, filename)
        
        # 保存數據
        with open(filepath, 'w', encoding='utf-8') as f:
            if isinstance(data, dict):
                json.dump(data, f, ensure_ascii=False, indent=2)
            else:
                json.dump({"content": str(data), "type": context_type}, f, ensure_ascii=False, indent=2)
        
        # 更新元數據
        self.metadata["context_files"][key] = {
            "filepath": filepath,
            "type": context_type,
            "created_at": timestamp,
            "size_bytes": os.path.getsize(filepath),
            "access_count": 0
        }
        self._save_metadata()
        
        print(f"💾 上下文已保存: {key} -> {filename}")
        return filepath
    
    def create_reference_prompt(self, key: str, include_summary: bool = True) -> str:
        """
        創建文件引用提示而非包含完整內容
        這是突破上下文限制的關鍵技術
        """
        if key not in self.metadata["context_files"]:
            return f"[錯誤: 上下文文件 '{key}' 不存在]"
        
        file_info = self.metadata["context_files"][key]
        
        if include_summary:
            summary = self._generate_file_summary(file_info["filepath"])
            return f"""[外部上下文引用]
文件: {key}
類型: {file_info['type']}
大小: {file_info['size_bytes']} 字節
摘要: {summary}
完整內容可通過 load_context('{key}') 獲取"""
        else:
            return f"[上下文文件: {key}.json 在工作空間中可用]"
    
    def _generate_file_summary(self, filepath: str, max_chars: int = 200) -> str:
        """生成文件內容摘要"""
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read()
                if len(content) <= max_chars:
                    return content
                return content[:max_chars] + "..."
        except Exception as e:
            return f"無法生成摘要: {str(e)}"
    
    def load_context(self, key: str) -> Optional[Any]:
        """根據需要加載上下文內容"""
        if key not in self.metadata["context_files"]:
            return None
        
        file_info = self.metadata["context_files"][key]
        filepath = file_info["filepath"]
        
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                data = json.load(f)
                
            # 更新訪問統計
            file_info["access_count"] += 1
            file_info["last_accessed"] = datetime.now().isoformat()
            self._save_metadata()
            
            print(f"📖 已加載上下文: {key}")
            return data
            
        except Exception as e:
            print(f"❌ 加載失敗: {key} - {str(e)}")
            return None
    
    def compress_and_archive(self, key: str, compression_strategy: str = "summary") -> str:
        """
        壓縮上下文但保證可恢復性
        
        strategies:
        - summary: 保留摘要和關鍵信息
        - url_only: 僅保留URL (適用於網頁)
        - path_only: 僅保留路徑 (適用於文檔)
        """
        if key not in self.metadata["context_files"]:
            return f"上下文 '{key}' 不存在"
        
        original_data = self.load_context(key)
        if not original_data:
            return f"無法加載 '{key}' 進行壓縮"
        
        if compression_strategy == "summary":
            compressed = {
                "type": "compressed_summary",
                "original_key": key,
                "summary": self._generate_file_summary(self.metadata["context_files"][key]["filepath"], 500),
                "key_points": self._extract_key_points(original_data),
                "restoration_hint": "原始內容已歸檔，可通過restore_context恢復"
            }
        elif compression_strategy == "url_only":
            compressed = {
                "type": "compressed_url",
                "original_key": key,
                "url": original_data.get("url", ""),
                "title": original_data.get("title", ""),
                "restoration_hint": f"完整內容可通過重新訪問URL獲取"
            }
        else:  # path_only
            compressed = {
                "type": "compressed_path",
                "original_key": key, 
                "file_path": original_data.get("path", ""),
                "restoration_hint": "完整內容可通過重新讀取文件獲取"
            }
        
        # 保存壓縮版本
        compressed_key = f"{key}_compressed"
        self.save_context(compressed_key, compressed, "compressed")
        
        # 記錄壓縮映射
        self.metadata["compression_index"][key] = compressed_key
        self._save_metadata()
        
        return f"✅ 已壓縮 '{key}' -> '{compressed_key}'"
    
    def _extract_key_points(self, data: Any, max_points: int = 5) -> List[str]:
        """提取關鍵要點 - 簡化實現"""
        if isinstance(data, dict):
            points = []
            for k, v in data.items():
                if isinstance(v, str) and len(v) > 20:
                    points.append(f"{k}: {str(v)[:100]}...")
                if len(points) >= max_points:
                    break
            return points
        else:
            content = str(data)
            sentences = content.split('。')[:max_points]
            return [s.strip() + '。' for s in sentences if s.strip()]
    
    def get_workspace_summary(self) -> str:
        """獲取工作空間摘要"""
        total_files = len(self.metadata["context_files"])
        total_size = sum(info["size_bytes"] for info in self.metadata["context_files"].values())
        
        return f"""📊 外部內存工作空間狀態:
- 總文件數: {total_files}
- 總大小: {total_size:,} 字節
- 壓縮文件數: {len(self.metadata["compression_index"])}
- 工作空間路徑: {self.workspace}"""

# 創建並測試文件系統上下文
fs_context = FileSystemContext()

print("✅ 文件系統上下文管理器創建完成")
print(fs_context.get_workspace_summary())

## 5️⃣ 錯誤保留學習機制

**核心概念**: 保留錯誤和失敗在上下文中，而非清理它們
- 錯誤提供模型適應所需的證據
- 失敗嘗試通過上下文內學習改善後續決策
- 錯誤恢復是真正agentic行為的指標

In [None]:
class RecitationAgent:
    """
    通過復述進行注意力操控的AI Agent
    
    核心策略:
    - 持續將目標推入最近的注意力範圍
    - 使用Todo.md作為注意力錨定機制
    - 在長任務序列中維持焦點
    """
    
    def __init__(self, primary_objective: str):
        self.primary_objective = primary_objective
        self.todo_items = []
        self.completed_tasks = []
        self.current_step = 0
        self.max_attention_drift = 10  # 最多10步後必須復述
        self.steps_since_recitation = 0
        
        print(f"🎯 主要目標設定: {primary_objective}")
        
    def add_todo(self, task: str, priority: str = "medium"):
        """添加待辦事項"""
        todo_item = {
            "id": len(self.todo_items) + 1,
            "task": task,
            "priority": priority,
            "created_at": datetime.now().isoformat(),
            "status": "pending"
        }
        self.todo_items.append(todo_item)
        print(f"📝 新增待辦: {task}")
        
    def complete_task(self, task_id: int, result: str = ""):
        """完成任務並觸發注意力復述"""
        for todo in self.todo_items:
            if todo["id"] == task_id and todo["status"] == "pending":
                todo["status"] = "completed"
                todo["completed_at"] = datetime.now().isoformat()
                todo["result"] = result
                
                self.completed_tasks.append(todo)
                print(f"✅ 任務完成: {todo['task']}")
                
                # 觸發注意力復述
                self._trigger_recitation(f"剛完成任務: {todo['task']}")
                break
                
    def _trigger_recitation(self, context: str = ""):
        """觸發注意力復述機制"""
        self.steps_since_recitation = 0
        recitation = self._generate_recitation(context)
        print(f"🔄 注意力復述:\\n{recitation}")
        return recitation
        
    def _generate_recitation(self, context: str = "") -> str:
        """生成注意力復述內容"""
        pending_tasks = [t for t in self.todo_items if t["status"] == "pending"]
        high_priority = [t for t in pending_tasks if t["priority"] == "high"]
        
        recitation_parts = [
            f"🎯 主要目標: {self.primary_objective}",
            "",
            f"📊 進度狀態: {len(self.completed_tasks)} 已完成 / {len(pending_tasks)} 待處理",
            ""
        ]
        
        if context:
            recitation_parts.extend([f"🔄 當前情況: {context}", ""])
            
        if high_priority:
            recitation_parts.append("🔥 高優先級任務:")
            for task in high_priority:
                recitation_parts.append(f"  - {task['task']}")
            recitation_parts.append("")
            
        if pending_tasks:
            recitation_parts.append("📋 接下來要做:")
            for task in pending_tasks[:3]:  # 只顯示前3個
                recitation_parts.append(f"  {task['id']}. {task['task']}")
                
        recitation_parts.extend([
            "",
            "⚡ 保持專注，不要偏離主要目標！"
        ])
        
        return "\\n".join(recitation_parts)
    
    def step_forward(self, action_description: str) -> Optional[str]:
        """
        執行一個步驟並檢查是否需要注意力復述
        
        Returns:
            如果需要復述則返回復述內容，否則返回None
        """
        self.current_step += 1
        self.steps_since_recitation += 1
        
        print(f"👣 步驟 {self.current_step}: {action_description}")
        
        # 檢查是否需要強制復述
        if self.steps_since_recitation >= self.max_attention_drift:
            return self._trigger_recitation("已執行多個步驟，需要重新對焦")
        
        return None
    
    def generate_todo_md(self) -> str:
        """生成Todo.md格式的注意力錨定文件"""
        md_content = [
            f"# {self.primary_objective}",
            "",
            f"**創建時間**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            f"**當前步驟**: {self.current_step}",
            "",
            "## 🎯 主要目標",
            f"{self.primary_objective}",
            "",
            "## 📋 任務列表",
            ""
        ]
        
        # 待處理任務
        pending = [t for t in self.todo_items if t["status"] == "pending"]
        if pending:
            md_content.append("### ⏳ 待處理")
            for task in pending:
                priority_emoji = "🔥" if task["priority"] == "high" else "📌" if task["priority"] == "medium" else "📝"
                md_content.append(f"- [ ] {priority_emoji} {task['task']}")
            md_content.append("")
        
        # 已完成任務
        if self.completed_tasks:
            md_content.append("### ✅ 已完成")
            for task in self.completed_tasks[-5:]:  # 只顯示最近5個
                md_content.append(f"- [x] {task['task']}")
            md_content.append("")
            
        md_content.extend([
            "## 🧠 注意力提醒",
            "",
            "- 始終記住主要目標",
            "- 每完成一個任務都要檢查進度",
            "- 如果偏離目標，立即重新對焦",
            "- 定期更新這個Todo.md文件"
        ])
        
        return "\\n".join(md_content)
    
    def save_todo_md(self, filename: str = "todo.md"):
        """保存Todo.md到文件系統"""
        content = self.generate_todo_md()
        
        # 使用文件系統上下文保存
        if 'fs_context' in globals():
            fs_context.save_context("current_todo", {"content": content}, "todo_md")
        
        # 也直接保存到文件
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
            
        print(f"💾 Todo.md已保存: {filename}")

# 創建並測試注意力復述代理
recitation_agent = RecitationAgent("實現完整的上下文工程系統")

# 添加一些示例任務
recitation_agent.add_todo("設計KV-Cache優化策略", "high")
recitation_agent.add_todo("實現工具遮罩機制", "high") 
recitation_agent.add_todo("構建文件系統外部內存", "high")
recitation_agent.add_todo("集成Kimi K2 API", "medium")
recitation_agent.add_todo("創建監控儀表板", "low")

print("\\n" + "="*50)
print("測試注意力復述機制:")
recitation_agent.complete_task(1, "已成功實現KV-Cache優化的上下文管理類")

## 6️⃣ Kimi K2集成與實際應用

**實戰應用**: 將上下文工程技術與Kimi K2模型結合
- 安全的API密鑰管理
- KV-Cache友好的請求結構
- 錯誤處理和恢復機制
- 實際生產部署考慮

In [None]:
class ErrorAwareAgent:
    """
    錯誤感知的AI Agent - 將失敗轉化為學習信號
    
    設計原理:
    - 保留所有錯誤和失敗在上下文中
    - 從錯誤中提取學習信號
    - 錯誤恢復作為agentic行為指標
    """
    
    def __init__(self):
        self.error_history = []
        self.learning_signals = []
        self.adaptation_patterns = {}
        self.recovery_strategies = {}
        
        print("🧠 錯誤感知代理初始化完成")
        
    def handle_tool_error(self, tool_name: str, error_type: str, error_details: str, 
                         attempted_args: dict, context: str = "") -> str:
        """
        處理工具錯誤並保留為學習信號
        
        Args:
            tool_name: 出錯的工具名稱
            error_type: 錯誤類型
            error_details: 錯誤詳情
            attempted_args: 嘗試的參數
            context: 錯誤發生的上下文
            
        Returns:
            格式化的錯誤上下文字符串
        """
        timestamp = datetime.now().isoformat()
        
        error_record = {
            "timestamp": timestamp,
            "tool_name": tool_name,
            "error_type": error_type,
            "error_details": error_details,
            "attempted_args": attempted_args,
            "context": context,
            "recovery_attempted": False
        }
        
        self.error_history.append(error_record)
        
        # 生成學習信號
        learning_signal = self._extract_learning_signal(error_record)
        self.learning_signals.append(learning_signal)
        
        # 格式化錯誤上下文 - 保留在上下文中而非清理
        error_context = f"""
💥 工具執行失敗記錄 (保留用於學習):
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🔧 工具: {tool_name}
❌ 錯誤類型: {error_type}
📝 錯誤詳情: {error_details}
⚙️ 嘗試參數: {json.dumps(attempted_args, ensure_ascii=False)}
📍 上下文: {context}
⏰ 時間: {timestamp}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

🧠 學習信號: {learning_signal['insight']}
🔄 建議調整: {learning_signal['adjustment']}

⚠️ 此錯誤已記錄，將影響後續決策制定。
下次使用 {tool_name} 時請考慮此失敗經驗。
"""
        
        print(f"💥 錯誤已記錄並保留: {tool_name} - {error_type}")
        return error_context
    
    def _extract_learning_signal(self, error_record: dict) -> dict:
        """從錯誤中提取學習信號"""
        tool_name = error_record["tool_name"]
        error_type = error_record["error_type"]
        
        # 基於錯誤類型的學習模式
        if error_type == "invalid_arguments":
            insight = f"{tool_name} 的參數格式或值有問題"
            adjustment = "檢查參數類型、格式和有效值範圍"
        elif error_type == "permission_denied":
            insight = f"{tool_name} 需要特定權限或認證"
            adjustment = "確認權限設置或提供必要的認證信息"
        elif error_type == "resource_not_found":
            insight = f"{tool_name} 嘗試訪問不存在的資源"
            adjustment = "在使用前先驗證資源存在性"
        elif error_type == "timeout":
            insight = f"{tool_name} 執行時間過長"
            adjustment = "減少請求複雜度或增加超時時間"
        else:
            insight = f"{tool_name} 遇到未預期的錯誤狀況"
            adjustment = "檢查工具狀態和環境配置"
            
        return {
            "insight": insight,
            "adjustment": adjustment,
            "pattern_detected": self._detect_error_pattern(tool_name, error_type),
            "confidence": 0.8
        }
    
    def _detect_error_pattern(self, tool_name: str, error_type: str) -> bool:
        """檢測錯誤模式"""
        similar_errors = [
            e for e in self.error_history 
            if e["tool_name"] == tool_name and e["error_type"] == error_type
        ]
        
        # 如果同樣的錯誤發生3次以上，認為是模式
        return len(similar_errors) >= 3
    
    def attempt_recovery(self, error_record: dict, recovery_strategy: str) -> str:
        """
        嘗試錯誤恢復
        
        Args:
            error_record: 錯誤記錄
            recovery_strategy: 恢復策略
            
        Returns:
            恢復嘗試的上下文字符串
        """
        error_record["recovery_attempted"] = True
        error_record["recovery_strategy"] = recovery_strategy
        error_record["recovery_timestamp"] = datetime.now().isoformat()
        
        recovery_context = f"""
🔄 錯誤恢復嘗試:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 針對錯誤: {error_record['tool_name']} - {error_record['error_type']}
🛠️ 恢復策略: {recovery_strategy}
⏰ 恢復時間: {error_record['recovery_timestamp']}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

💡 這次恢復嘗試將被記錄，用於評估agentic行為能力。
"""
        
        # 記錄恢復策略的效果
        if recovery_strategy not in self.recovery_strategies:
            self.recovery_strategies[recovery_strategy] = {"attempts": 0, "successes": 0}
        
        self.recovery_strategies[recovery_strategy]["attempts"] += 1
        
        print(f"🔄 嘗試錯誤恢復: {recovery_strategy}")
        return recovery_context
    
    def mark_recovery_success(self, recovery_strategy: str):
        """標記恢復成功"""
        if recovery_strategy in self.recovery_strategies:
            self.recovery_strategies[recovery_strategy]["successes"] += 1
            print(f"✅ 恢復成功: {recovery_strategy}")
    
    def get_error_insights(self) -> str:
        """獲取錯誤洞察報告"""
        if not self.error_history:
            return "📊 暫無錯誤記錄"
        
        # 統計分析
        tool_errors = {}
        error_types = {}
        
        for error in self.error_history:
            tool = error["tool_name"]
            etype = error["error_type"]
            
            tool_errors[tool] = tool_errors.get(tool, 0) + 1
            error_types[etype] = error_types.get(etype, 0) + 1
        
        # 生成報告
        report = [
            "📊 錯誤學習洞察報告",
            "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
            f"總錯誤數: {len(self.error_history)}",
            f"學習信號數: {len(self.learning_signals)}",
            "",
            "🔧 工具錯誤統計:"
        ]
        
        for tool, count in sorted(tool_errors.items(), key=lambda x: x[1], reverse=True):
            report.append(f"  - {tool}: {count} 次")
            
        report.extend(["", "❌ 錯誤類型統計:"])
        for etype, count in sorted(error_types.items(), key=lambda x: x[1], reverse=True):
            report.append(f"  - {etype}: {count} 次")
            
        # 恢復策略效果
        if self.recovery_strategies:
            report.extend(["", "🔄 恢復策略效果:"])
            for strategy, stats in self.recovery_strategies.items():
                success_rate = stats["successes"] / stats["attempts"] * 100
                report.append(f"  - {strategy}: {stats['successes']}/{stats['attempts']} ({success_rate:.1f}%)")
        
        report.extend([
            "",
            "💡 關鍵學習:",
            "  - 錯誤是寶貴的學習資源，不應被清理",
            "  - 重複錯誤模式需要系統性解決",
            "  - 恢復能力是agentic行為的重要指標"
        ])
        
        return "\\n".join(report)

# 創建並測試錯誤感知代理
error_agent = ErrorAwareAgent()

print("✅ 錯誤感知代理創建完成")

# 模擬一些錯誤場景
print("\\n" + "="*50)
print("模擬錯誤場景和學習過程:")

# 場景1: 參數錯誤
error_context1 = error_agent.handle_tool_error(
    tool_name="file_read",
    error_type="invalid_arguments", 
    error_details="路徑參數必須是字符串，收到的是列表",
    attempted_args={"path": ["file1.txt", "file2.txt"]},
    context="嘗試批量讀取文件時發生錯誤"
)

# 場景2: 權限錯誤
error_context2 = error_agent.handle_tool_error(
    tool_name="shell_execute",
    error_type="permission_denied",
    error_details="沒有執行sudo命令的權限",
    attempted_args={"command": "sudo apt-get install package"},
    context="嘗試安裝系統包時被拒絕"
)

print("\\n📊 當前錯誤洞察:")
print(error_agent.get_error_insights())

## 7️⃣ 交互式監控儀表板

**可視化監控**: 實時跟踪上下文工程指標和系統性能
- 緩存效率可視化
- 工具使用統計  
- 錯誤模式分析
- 注意力復述頻率

In [None]:
class KimiContextEngineeredAgent:
    """
    集成Kimi K2的上下文工程代理
    
    結合了所有上下文工程最佳實踐:
    - KV-Cache優化的請求結構
    - 工具遮罩管理
    - 文件系統外部內存
    - 注意力復述機制
    - 錯誤保留學習
    """
    
    def __init__(self, api_config: dict):
        self.api_config = api_config
        self.client = httpx.AsyncClient(
            base_url=api_config["base_url"],
            headers={"Authorization": f"Bearer {api_config['api_key']}"},
            timeout=30.0
        )
        
        # 集成所有上下文工程組件
        self.context_manager = ContextOptimizedAgent()
        self.tool_manager = MaskedToolAgent() 
        self.fs_context = FileSystemContext()
        self.recitation_agent = RecitationAgent("執行複雜任務")
        self.error_agent = ErrorAwareAgent()
        
        # 請求統計
        self.request_stats = {
            "total_requests": 0,
            "cache_hits": 0,
            "errors": 0,
            "total_tokens": 0
        }
        
        print("🚀 Kimi上下文工程代理初始化完成")
        
    async def chat_completion(self, user_input: str, use_cache_optimization: bool = True) -> dict:
        """
        發送上下文優化的聊天請求到Kimi K2
        
        Args:
            user_input: 用戶輸入
            use_cache_optimization: 是否使用緩存優化
            
        Returns:
            API響應結果
        """
        try:
            # 使用上下文優化構建提示
            optimized_prompt = self.context_manager.build_prompt(
                user_input, 
                preserve_cache=use_cache_optimization
            )
            
            # 構建請求
            request_data = {
                "model": self.api_config["model"],
                "messages": [
                    {"role": "user", "content": optimized_prompt}
                ],
                "max_tokens": self.api_config["max_tokens"],
                "temperature": self.api_config["temperature"]
            }
            
            # 發送請求
            self.request_stats["total_requests"] += 1
            
            response = await self.client.post(
                "/chat/completions",
                json=request_data
            )
            
            if response.status_code == 200:
                result = response.json()
                
                # 更新統計
                if "usage" in result:
                    self.request_stats["total_tokens"] += result["usage"].get("total_tokens", 0)
                
                # 估算緩存命中 (簡化邏輯)
                if use_cache_optimization and len(self.context_manager.session_history) > 0:
                    self.request_stats["cache_hits"] += 1
                
                # 添加到會話歷史
                assistant_response = result["choices"][0]["message"]["content"]
                self.context_manager.add_interaction(user_input, assistant_response)
                
                print(f"✅ Kimi請求成功，使用緩存優化: {use_cache_optimization}")
                return result
                
            else:
                # 處理API錯誤
                error_details = f"HTTP {response.status_code}: {response.text}"
                self._handle_api_error("api_request", "http_error", error_details, request_data)
                return {"error": error_details}
                
        except Exception as e:
            # 處理連接錯誤
            error_details = str(e)
            self._handle_api_error("api_request", "connection_error", error_details, {})
            return {"error": error_details}
    
    def _handle_api_error(self, tool_name: str, error_type: str, error_details: str, attempted_args: dict):
        """處理API錯誤並應用錯誤學習"""
        self.request_stats["errors"] += 1
        
        error_context = self.error_agent.handle_tool_error(
            tool_name=tool_name,
            error_type=error_type,
            error_details=error_details,
            attempted_args=attempted_args,
            context="Kimi API調用過程中"
        )
        
        print(f"🔥 API錯誤已記錄: {error_type}")
        return error_context
    
    async def execute_tool_with_context(self, tool_name: str, **kwargs) -> dict:
        """
        在上下文感知狀態下執行工具
        
        Args:
            tool_name: 工具名稱
            **kwargs: 工具參數
            
        Returns:
            執行結果
        """
        # 檢查工具是否可用
        if not self.tool_manager.is_tool_available(tool_name):
            return {
                "error": f"工具 {tool_name} 在當前狀態下不可用",
                "masked": True
            }
        
        try:
            # 模擬工具執行 (實際應用中這裡會調用真實工具)
            if tool_name == "file_read":
                if "path" not in kwargs:
                    raise ValueError("缺少必要參數: path")
                
                # 使用外部內存系統
                content = self.fs_context.load_context(kwargs["path"])
                if content is None:
                    return {"error": "文件不存在或無法讀取"}
                
                return {"success": True, "content": content}
                
            elif tool_name == "browser_navigate":
                if "url" not in kwargs:
                    raise ValueError("缺少必要參數: url")
                
                # 模擬導航並保存到外部內存
                page_data = {
                    "url": kwargs["url"],
                    "title": f"頁面標題 - {kwargs['url']}",
                    "content": f"從 {kwargs['url']} 獲取的頁面內容...",
                    "timestamp": datetime.now().isoformat()
                }
                
                self.fs_context.save_context(f"page_{kwargs['url'].replace('/', '_')}", page_data, "web_page")
                return {"success": True, "page_data": page_data}
                
            else:
                return {"error": f"未實現的工具: {tool_name}"}
                
        except Exception as e:
            # 錯誤處理和學習
            error_context = self.error_agent.handle_tool_error(
                tool_name=tool_name,
                error_type="execution_error",
                error_details=str(e),
                attempted_args=kwargs,
                context="工具執行過程中"
            )
            
            return {"error": str(e), "error_context": error_context}
    
    async def complete_complex_task(self, task_description: str, max_steps: int = 20) -> dict:
        """
        執行複雜任務並應用所有上下文工程技術
        
        Args:
            task_description: 任務描述
            max_steps: 最大步驟數
            
        Returns:
            任務執行結果
        """
        print(f"🎯 開始執行複雜任務: {task_description}")
        
        # 設置任務目標
        self.recitation_agent.primary_objective = task_description
        
        # 設置工具狀態
        self.tool_manager.set_state("task_execution", ["file", "browser"])
        
        results = []
        
        for step in range(max_steps):
            # 注意力復述檢查
            recitation = self.recitation_agent.step_forward(f"執行第{step+1}步")
            if recitation:
                print(f"🔄 觸發注意力復述:\\n{recitation}")
            
            # 構建當前步驟的上下文
            step_prompt = f"""
當前任務: {task_description}
步驟 {step+1}/{max_steps}

可用工具: {', '.join(self.tool_manager.get_available_tools())}

請分析當前情況並決定下一步行動。
如果任務已完成，請回答'TASK_COMPLETED'。
如果需要更多信息，請說明需要什麼。
"""
            
            # 請求Kimi決策
            response = await self.chat_completion(step_prompt)
            
            if "error" in response:
                print(f"❌ 步驟 {step+1} 出錯: {response['error']}")
                continue
                
            assistant_response = response["choices"][0]["message"]["content"]
            results.append({
                "step": step + 1,
                "response": assistant_response,
                "timestamp": datetime.now().isoformat()
            })
            
            # 檢查任務完成
            if "TASK_COMPLETED" in assistant_response:
                print(f"✅ 任務在第{step+1}步完成")
                break
                
            # 模擬執行建議的動作
            print(f"👣 步驟 {step+1}: {assistant_response[:100]}...")
        
        # 生成任務報告
        report = {
            "task": task_description,
            "completed": True,
            "total_steps": len(results),
            "final_result": results[-1] if results else None,
            "context_stats": self.get_performance_stats(),
            "workspace_summary": self.fs_context.get_workspace_summary()
        }
        
        print(f"📊 任務完成報告已生成")
        return report
    
    def get_performance_stats(self) -> dict:
        """獲取性能統計"""
        cache_efficiency = (
            self.request_stats["cache_hits"] / self.request_stats["total_requests"] * 100
            if self.request_stats["total_requests"] > 0 else 0
        )
        
        return {
            "總請求數": self.request_stats["total_requests"],
            "緩存命中數": self.request_stats["cache_hits"], 
            "緩存效率": f"{cache_efficiency:.1f}%",
            "錯誤數": self.request_stats["errors"],
            "總Token數": self.request_stats["total_tokens"],
            "錯誤洞察": self.error_agent.get_error_insights()
        }
    
    async def cleanup(self):
        """清理資源"""
        await self.client.aclose()
        print("🧹 資源清理完成")

# 使用示例
async def demo_kimi_context_engineering():
    """演示Kimi上下文工程代理的使用"""
    print("🔧 準備配置Kimi API...")
    
    # 注意: 實際使用時需要真實的API配置
    demo_config = {
        "api_key": "your-kimi-api-key-here",  # 實際使用時通過setup_kimi_api()獲取
        "base_url": "https://api.moonshot.cn/v1",
        "model": "moonshot-v1-32k",
        "max_tokens": 2048,
        "temperature": 0.3
    }
    
    print("⚠️ 演示模式 - 使用模擬配置")
    print("實際使用時請運行: kimi_config = setup_kimi_api()")
    
    # agent = KimiContextEngineeredAgent(demo_config)
    # 
    # try:
    #     # 演示簡單對話
    #     response = await agent.chat_completion("解釋什麼是上下文工程")
    #     print(f"📝 回應: {response}")
    #     
    #     # 演示複雜任務
    #     task_result = await agent.complete_complex_task("分析上下文工程的核心概念")
    #     print(f"📊 任務結果: {task_result}")
    #     
    # finally:
    #     await agent.cleanup()

print("✅ Kimi K2集成代理定義完成")
print("💡 使用方法:")
print("1. 先運行 'kimi_config = setup_kimi_api()' 配置API")
print("2. 創建代理: 'agent = KimiContextEngineeredAgent(kimi_config)'") 
print("3. 使用: 'await agent.chat_completion(\"你的問題\")'")
print("4. 複雜任務: 'await agent.complete_complex_task(\"任務描述\")'")

# 運行演示
# await demo_kimi_context_engineering()  # 取消註釋以運行

## 🎯 實戰總結與下一步

### ✅ 已完成的實現

本筆記本成功實現了基於Manus博客的上下文工程核心技術：

1. **KV-Cache優化** (`ContextOptimizedAgent`)
   - 穩定前綴設計實現緩存友好
   - Append-only上下文構建
   - 確定性序列化避免緩存失效

2. **工具遮罩管理** (`MaskedToolAgent`) 
   - 保持工具定義穩定性
   - 通過狀態機控制可用性
   - 前綴命名約定簡化管理

3. **文件系統外部內存** (`FileSystemContext`)
   - 無限大小的上下文存儲
   - 可恢復的壓縮策略
   - 選擇性內容檢索

4. **注意力復述機制** (`RecitationAgent`)
   - 防止注意力漂移
   - Todo.md作為錨定機制
   - 長任務序列焦點維持

5. **錯誤保留學習** (`ErrorAwareAgent`)
   - 錯誤作為學習信號
   - 恢復策略效果追蹤
   - agentic行為能力評估

6. **Kimi K2集成** (`KimiContextEngineeredAgent`)
   - 安全API密鑰管理
   - 生產級錯誤處理
   - 完整上下文工程流水線

7. **監控儀表板** (`ContextEngineeringDashboard`)
   - 實時性能追蹤
   - 可視化關鍵指標
   - 交互式控制介面

### 🔑 關鍵技術成果

- **成本優化**: KV-Cache命中可降低推理成本10倍
- **穩定性**: 工具遮罩避免破壞緩存的動態修改
- **可擴展性**: 文件系統突破上下文窗口限制  
- **可靠性**: 錯誤學習機制提升系統適應能力
- **可觀測性**: 全面的監控和分析能力

### 🚀 生產部署建議

1. **環境準備**
   ```bash
   pip install httpx asyncio ipywidgets plotly
   ```

2. **API配置**
   ```python
   kimi_config = setup_kimi_api()  # 安全輸入API密鑰
   ```

3. **創建完整代理**
   ```python
   agent = KimiContextEngineeredAgent(kimi_config)
   dashboard = ContextEngineeringDashboard(agent)
   ```

4. **監控和維護**
   ```python
   dashboard.display()  # 啟動監控儀表板
   ```

### 📚 延伸學習方向

- **Neural Turing Machines** - 進階外部內存架構
- **State Space Models** - 更高效的序列處理
- **Retrieval-Augmented Generation** - 動態知識注入
- **Model Context Protocol** - 標準化工具集成

### 💡 實際應用場景

- **企業級客服系統** - 長對話記憶管理
- **代碼生成助手** - 大型項目上下文理解  
- **研究分析工具** - 文檔知識圖譜構建
- **教育輔導平台** - 個性化學習路徑追蹤

---

**🎉 恭喜！您已掌握了生產級AI Agent的上下文工程精髓！**