# 向量搜索应用集成
# Vector Search Applications Integration

这个笔记本展示如何将所有组件集成为完整的向量搜索应用系统。

In [None]:
# 导入所有必要的库
import sys
sys.path.append('..')

import numpy as np
import matplotlib.pyplot as plt
import json
from src.basic_vector_search import BasicVectorSearch
from src.text_vectorizer import TextVectorizer
from src.utils import load_json, save_json
from examples.document_search import DocumentSearchSystem
from examples.semantic_search import SemanticSearchSystem
from examples.recommendation import ContentRecommendationSystem

## 1. 数据准备

In [None]:
# 加载示例数据
documents = load_json('../data/sample_documents.json')
print(f"加载了 {len(documents)} 个文档")

# 显示文档结构
if documents:
    print("\n文档结构示例:")
    sample_doc = documents[0]
    for key, value in sample_doc.items():
        if isinstance(value, str) and len(value) > 50:
            print(f"{key}: {value[:50]}...")
        else:
            print(f"{key}: {value}")

## 2. 文档搜索系统演示

In [None]:
# 初始化文档搜索系统
print("=== 文档搜索系统演示 ===")

doc_search = DocumentSearchSystem(documents)
print(f"文档搜索系统已初始化，包含 {len(documents)} 个文档")

# 测试不同类型的搜索
test_queries = [
    "人工智能",
    "机器学习算法",
    "数据分析"
]

search_types = ['keyword', 'semantic', 'hybrid']

for query in test_queries:
    print(f"\n查询: '{query}'")
    print("-" * 50)
    
    for search_type in search_types:
        print(f"\n{search_type.upper()} 搜索结果:")
        try:
            results = doc_search.search(query, method=search_type, top_k=3)
            
            if results:
                for i, result in enumerate(results):
                    title = result.get('title', f"文档{result.get('doc_id', 'N/A')}")
                    score = result.get('score', 0)
                    print(f"  {i+1}. {title} (得分: {score:.3f})")
            else:
                print("  未找到相关结果")
        except Exception as e:
            print(f"  搜索出错: {e}")

## 3. 语义搜索系统演示

In [None]:
# 初始化语义搜索系统
print("\n=== 语义搜索系统演示 ===")

try:
    semantic_search = SemanticSearchSystem()
    
    # 添加文档
    for doc in documents:
        semantic_search.add_document(
            content=doc['content'],
            metadata={
                'title': doc.get('title', ''),
                'category': doc.get('category', ''),
                'doc_id': doc.get('id', '')
            }
        )
    
    print(f"语义搜索系统已初始化，包含 {len(documents)} 个文档")
    
    # 测试语义搜索
    semantic_queries = [
        "深度学习模型训练",
        "自然语言处理技术",
        "计算机视觉应用"
    ]
    
    for query in semantic_queries:
        print(f"\n语义查询: '{query}'")
        print("-" * 40)
        
        results = semantic_search.search(query, top_k=3)
        
        if results:
            for i, result in enumerate(results):
                title = result['metadata'].get('title', f"文档{i+1}")
                similarity = result['similarity']
                content_preview = result['content'][:100] + "..."
                print(f"  {i+1}. {title}")
                print(f"     相似度: {similarity:.3f}")
                print(f"     内容: {content_preview}")
        else:
            print("  未找到相关结果")
    
    # 文档聚类分析
    print("\n=== 文档聚类分析 ===")
    try:
        clusters = semantic_search.cluster_documents(n_clusters=3)
        
        print(f"将文档分为 {len(clusters)} 个聚类:")
        for cluster_id, cluster_docs in clusters.items():
            print(f"\n聚类 {cluster_id} ({len(cluster_docs)} 个文档):")
            for doc_idx in cluster_docs[:3]:  # 显示前3个
                if doc_idx < len(documents):
                    title = documents[doc_idx].get('title', f"文档{doc_idx}")
                    print(f"  - {title}")
    except Exception as e:
        print(f"聚类分析出错: {e}")
        
except Exception as e:
    print(f"语义搜索系统初始化失败: {e}")
    print("可能需要安装sentence-transformers: pip install sentence-transformers")

## 4. 推荐系统演示

In [None]:
# 初始化推荐系统
print("\n=== 内容推荐系统演示 ===")

try:
    rec_system = ContentRecommendationSystem(documents, vector_type='tfidf')
    print(f"推荐系统已初始化，包含 {len(documents)} 个文档")
    
    # 模拟用户行为
    print("\n模拟用户行为...")
    
    # 用户1: 对AI和机器学习感兴趣
    user1_id = "user_ai_enthusiast"
    ai_related_docs = [i for i, doc in enumerate(documents) 
                      if any(keyword in doc.get('content', '').lower() 
                            for keyword in ['人工智能', '机器学习', 'ai', '算法'])]
    
    for doc_id in ai_related_docs[:3]:  # 与前3个AI相关文档交互
        rec_system.add_user_interaction(user1_id, doc_id, 'like')
    
    # 用户2: 对数据分析感兴趣
    user2_id = "user_data_analyst"
    data_related_docs = [i for i, doc in enumerate(documents) 
                        if any(keyword in doc.get('content', '').lower() 
                              for keyword in ['数据', '分析', '统计', '可视化'])]
    
    for doc_id in data_related_docs[:3]:  # 与前3个数据相关文档交互
        rec_system.add_user_interaction(user2_id, doc_id, 'like')
    
    print(f"为用户 {user1_id} 和 {user2_id} 模拟了交互行为")
    
    # 生成推荐
    users = [user1_id, user2_id]
    strategies = ['content', 'popularity', 'hybrid']
    
    for user_id in users:
        print(f"\n=== {user_id} 的推荐结果 ===")
        
        # 显示用户统计
        user_stats = rec_system.get_user_statistics(user_id)
        if 'error' not in user_stats:
            print(f"用户统计: {user_stats['total_interactions']} 次交互, "
                  f"{user_stats['liked_items_count']} 个喜欢的物品")
        
        for strategy in strategies:
            print(f"\n{strategy.upper()} 策略推荐:")
            try:
                recommendations = rec_system.recommend_items(user_id, 3, strategy)
                
                if recommendations:
                    for i, rec in enumerate(recommendations):
                        doc = rec['document']
                        title = doc.get('title', f"文档{rec['item_id']}")
                        similarity = rec['similarity']
                        reason = rec['reason']
                        print(f"  {i+1}. {title}")
                        print(f"     相似度: {similarity:.3f}")
                        print(f"     推荐原因: {reason}")
                else:
                    print("  暂无推荐")
            except Exception as e:
                print(f"  推荐生成出错: {e}")
    
    # 推荐解释
    print(f"\n=== 推荐解释示例 ===")
    if len(documents) > 0:
        item_to_explain = 0  # 解释第一个文档的推荐
        explanation = rec_system.explain_recommendation(user1_id, item_to_explain)
        
        print(f"为用户 {user1_id} 推荐文档 {item_to_explain} 的原因:")
        print(json.dumps(explanation, indent=2, ensure_ascii=False))

except Exception as e:
    print(f"推荐系统初始化失败: {e}")

## 5. 系统性能分析

In [None]:
# 综合性能分析
print("\n=== 系统性能分析 ===")

import time
import psutil
import os

def measure_system_performance():
    """测量系统资源使用情况"""
    process = psutil.Process(os.getpid())
    
    return {
        'memory_mb': process.memory_info().rss / 1024 / 1024,
        'cpu_percent': process.cpu_percent(),
        'num_threads': process.num_threads()
    }

# 测试各系统的初始化时间和资源使用
systems_performance = {}

# 1. 基础向量搜索
print("测试基础向量搜索性能...")
start_time = time.time()
start_perf = measure_system_performance()

basic_search = BasicVectorSearch()
# 生成测试向量
test_vectors = np.random.randn(1000, 128)
basic_search.add_vectors(test_vectors)

# 执行搜索
query = np.random.randn(128)
_ = basic_search.search(query, k=10)

end_time = time.time()
end_perf = measure_system_performance()

systems_performance['基础搜索'] = {
    'init_time': end_time - start_time,
    'memory_usage': end_perf['memory_mb'] - start_perf['memory_mb'],
    'total_memory': end_perf['memory_mb']
}

# 2. 文本向量化
print("测试文本向量化性能...")
start_time = time.time()
start_perf = measure_system_performance()

vectorizer = TextVectorizer(method='tfidf')
sample_texts = [doc['content'] for doc in documents[:100]]  # 使用前100个文档
vectors = vectorizer.fit_transform(sample_texts)

end_time = time.time()
end_perf = measure_system_performance()

systems_performance['文本向量化'] = {
    'init_time': end_time - start_time,
    'memory_usage': end_perf['memory_mb'] - start_perf['memory_mb'],
    'total_memory': end_perf['memory_mb']
}

# 打印性能报告
print("\n性能报告:")
print("-" * 60)
print(f"{'系统':<15} {'初始化时间(秒)':<15} {'内存增量(MB)':<15} {'总内存(MB)':<15}")
print("-" * 60)

for system_name, perf in systems_performance.items():
    print(f"{system_name:<15} {perf['init_time']:<15.3f} {perf['memory_usage']:<15.1f} {perf['total_memory']:<15.1f}")

# 可视化性能数据
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

systems = list(systems_performance.keys())
init_times = [systems_performance[s]['init_time'] for s in systems]
memory_usage = [systems_performance[s]['memory_usage'] for s in systems]

# 初始化时间
ax1.bar(systems, init_times)
ax1.set_ylabel('初始化时间 (秒)')
ax1.set_title('系统初始化时间比较')
ax1.tick_params(axis='x', rotation=45)

# 内存使用
ax2.bar(systems, memory_usage)
ax2.set_ylabel('内存增量 (MB)')
ax2.set_title('系统内存使用比较')
ax2.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 6. 端到端应用演示

In [None]:
# 构建一个简单的端到端应用
print("\n=== 端到端应用演示 ===")

class IntegratedSearchSystem:
    """集成搜索系统"""
    
    def __init__(self, documents):
        self.documents = documents
        self.doc_search = DocumentSearchSystem(documents)
        
        # 尝试初始化语义搜索
        try:
            self.semantic_search = SemanticSearchSystem()
            for doc in documents:
                self.semantic_search.add_document(
                    content=doc['content'],
                    metadata={'title': doc.get('title', ''), 'id': doc.get('id', '')}
                )
            self.has_semantic = True
        except:
            self.has_semantic = False
            print("语义搜索不可用，仅使用关键词搜索")
        
        # 初始化推荐系统
        try:
            self.rec_system = ContentRecommendationSystem(documents)
            self.has_recommendation = True
        except:
            self.has_recommendation = False
            print("推荐系统不可用")
    
    def unified_search(self, query, search_type='auto', top_k=5):
        """统一搜索接口"""
        results = []
        
        if search_type == 'auto':
            # 自动选择搜索方式
            if self.has_semantic:
                search_type = 'semantic'
            else:
                search_type = 'keyword'
        
        if search_type == 'keyword':
            results = self.doc_search.search(query, method='keyword', top_k=top_k)
        elif search_type == 'semantic' and self.has_semantic:
            semantic_results = self.semantic_search.search(query, top_k=top_k)
            results = [{
                'title': r['metadata'].get('title', ''),
                'score': r['similarity'],
                'content': r['content'][:200] + '...',
                'type': 'semantic'
            } for r in semantic_results]
        elif search_type == 'hybrid':
            results = self.doc_search.search(query, method='hybrid', top_k=top_k)
        
        return results
    
    def get_recommendations(self, user_id, num_recs=5):
        """获取推荐"""
        if not self.has_recommendation:
            return []
        
        try:
            return self.rec_system.recommend_items(user_id, num_recs, 'hybrid')
        except:
            return []
    
    def add_user_feedback(self, user_id, doc_id, feedback_type):
        """添加用户反馈"""
        if self.has_recommendation:
            self.rec_system.add_user_interaction(user_id, doc_id, feedback_type)

# 初始化集成系统
integrated_system = IntegratedSearchSystem(documents)
print("集成搜索系统已初始化")

# 演示统一搜索
demo_queries = ["机器学习", "数据分析", "人工智能应用"]

for query in demo_queries:
    print(f"\n搜索查询: '{query}'")
    print("-" * 40)
    
    results = integrated_system.unified_search(query, top_k=3)
    
    if results:
        for i, result in enumerate(results):
            title = result.get('title', '无标题')
            score = result.get('score', 0)
            result_type = result.get('type', 'keyword')
            print(f"  {i+1}. {title} [{result_type}]")
            print(f"     得分: {score:.3f}")
            if 'content' in result:
                print(f"     摘要: {result['content'][:100]}...")
    else:
        print("  未找到相关结果")

# 演示推荐功能
if integrated_system.has_recommendation:
    print("\n=== 推荐演示 ===")
    
    # 模拟用户交互
    demo_user = "demo_user"
    integrated_system.add_user_feedback(demo_user, 0, 'like')
    integrated_system.add_user_feedback(demo_user, 1, 'view')
    
    recommendations = integrated_system.get_recommendations(demo_user, 3)
    
    if recommendations:
        print(f"为用户 {demo_user} 的推荐:")
        for i, rec in enumerate(recommendations):
            doc = rec['document']
            title = doc.get('title', f"文档{rec['item_id']}")
            print(f"  {i+1}. {title} (相似度: {rec['similarity']:.3f})")
    else:
        print("暂无推荐")
else:
    print("\n推荐功能不可用")

## 7. 总结和部署建议

### 系统特点:
- **模块化设计**: 各组件可独立使用
- **灵活配置**: 支持多种搜索和推荐策略
- **可扩展**: 易于添加新功能和算法

### 部署建议:
1. **开发环境**: 使用基础搜索和TF-IDF
2. **生产环境**: 集成FAISS和语义搜索
3. **大规模部署**: 考虑分布式架构

### 性能优化:
- 使用向量缓存
- 批量处理
- 异步搜索
- 索引预热

### 监控指标:
- 搜索延迟
- 内存使用
- 索引大小
- 用户满意度

这个向量搜索项目提供了完整的基础架构，可以根据具体需求进行定制和扩展。

# 向量搜索实际应用案例

本notebook展示向量搜索在实际场景中的应用，包括文档搜索、推荐系统、语义检索等。

## 学习目标
- 掌握文档搜索系统的构建
- 理解推荐系统中的向量化应用
- 学习语义搜索的实现方法
- 了解向量搜索的评估指标

In [None]:
# 导入必要的库
import sys
import os
sys.path.append(os.path.join(os.getcwd(), '..'))

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import normalize
import json
import time
from collections import defaultdict, Counter

from src.basic_vector_search import BasicVectorSearch
from src.advanced_search import FAISSSearch
from src.text_vectorizer import TextVectorizer
from src.utils import load_documents, VectorSearchBenchmark

plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False   # 用来正常显示负号

## 1. 文档搜索系统

构建一个完整的文档搜索系统，支持关键词搜索和语义搜索。

In [None]:
class DocumentSearchSystem:
    """文档搜索系统"""
    
    def __init__(self):
        self.documents = []
        self.vectorizer = TextVectorizer()
        self.tfidf_vectors = None
        self.semantic_vectors = None
        self.basic_search = None
        self.faiss_search = None
        
    def load_documents(self, documents):
        """加载文档"""
        self.documents = documents
        texts = [doc['content'] for doc in documents]
        
        print("正在构建TF-IDF索引...")
        self.tfidf_vectors = self.vectorizer.tfidf_vectorize(texts)
        self.basic_search = BasicVectorSearch()
        self.basic_search.add_vectors(self.tfidf_vectors.toarray())
        
        print("正在构建语义向量索引...")
        try:
            self.semantic_vectors = self.vectorizer.sentence_transformer_vectorize(texts)
            self.faiss_search = FAISSSearch(vector_dim=self.semantic_vectors.shape[1])
            self.faiss_search.add_vectors(self.semantic_vectors)
        except Exception as e:
            print(f"语义向量构建失败: {e}")
            print("将使用TF-IDF作为备选")
            normalized_tfidf = normalize(self.tfidf_vectors.toarray().astype('float32'))
            self.faiss_search = FAISSSearch(vector_dim=normalized_tfidf.shape[1])
            self.faiss_search.add_vectors(normalized_tfidf)
            self.semantic_vectors = normalized_tfidf
    
    def keyword_search(self, query, top_k=5):
        """关键词搜索"""
        query_vector = self.vectorizer.tfidf_vectorize([query])
        similarities, indices = self.basic_search.search(query_vector.toarray()[0], top_k)
        
        results = []
        for i, (idx, sim) in enumerate(zip(indices, similarities)):
            if idx < len(self.documents):
                results.append({
                    'rank': i + 1,
                    'document': self.documents[idx],
                    'similarity': sim,
                    'type': 'keyword'
                })
        return results
    
    def semantic_search(self, query, top_k=5):
        """语义搜索"""
        try:
            query_vector = self.vectorizer.sentence_transformer_vectorize([query])
        except:
            query_vector = self.vectorizer.tfidf_vectorize([query])
            query_vector = normalize(query_vector.toarray().astype('float32'))
        
        distances, indices = self.faiss_search.search(query_vector, top_k)
        
        results = []
        for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
            if idx < len(self.documents):
                # 转换距离为相似度
                similarity = 1 / (1 + dist)
                results.append({
                    'rank': i + 1,
                    'document': self.documents[idx],
                    'similarity': similarity,
                    'distance': dist,
                    'type': 'semantic'
                })
        return results
    
    def hybrid_search(self, query, top_k=5, alpha=0.7):
        """混合搜索：结合关键词和语义搜索"""
        keyword_results = self.keyword_search(query, top_k*2)
        semantic_results = self.semantic_search(query, top_k*2)
        
        # 合并结果并重新排序
        combined_scores = defaultdict(float)
        doc_info = {}
        
        for result in keyword_results:
            doc_id = result['document']['id']
            combined_scores[doc_id] += alpha * result['similarity']
            doc_info[doc_id] = result['document']
        
        for result in semantic_results:
            doc_id = result['document']['id']
            combined_scores[doc_id] += (1 - alpha) * result['similarity']
            doc_info[doc_id] = result['document']
        
        # 排序并返回top_k
        sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
        
        results = []
        for i, (doc_id, score) in enumerate(sorted_docs):
            results.append({
                'rank': i + 1,
                'document': doc_info[doc_id],
                'similarity': score,
                'type': 'hybrid'
            })
        
        return results

# 初始化搜索系统
search_system = DocumentSearchSystem()
documents = load_documents('../data/sample_documents.json')
search_system.load_documents(documents)

print(f"\n文档搜索系统已准备就绪，共加载 {len(documents)} 个文档")

In [None]:
# 测试不同的搜索方法
test_queries = [
    "人工智能发展",
    "体育运动健康",
    "艺术创作灵感",
    "科技创新应用"
]

def display_search_results(results, query, method_name):
    """显示搜索结果"""
    print(f"\n{method_name} - 查询: '{query}'")
    print("="*60)
    for result in results:
        doc = result['document']
        print(f"{result['rank']}. {doc['title']} (相似度: {result['similarity']:.3f})")
        print(f"   类别: {doc['category']}")
        print(f"   内容: {doc['content'][:80]}...")
        print()

# 对每个查询测试三种搜索方法
query = test_queries[0]
print(f"测试查询: '{query}'")

# 关键词搜索
keyword_results = search_system.keyword_search(query, top_k=3)
display_search_results(keyword_results, query, "关键词搜索")

# 语义搜索
semantic_results = search_system.semantic_search(query, top_k=3)
display_search_results(semantic_results, query, "语义搜索")

# 混合搜索
hybrid_results = search_system.hybrid_search(query, top_k=3)
display_search_results(hybrid_results, query, "混合搜索")

## 2. 推荐系统

基于向量相似度的内容推荐系统。

In [None]:
class ContentRecommendationSystem:
    """基于内容的推荐系统"""
    
    def __init__(self, documents):
        self.documents = documents
        self.vectorizer = TextVectorizer()
        self.item_vectors = None
        self.user_profiles = {}
        self._build_item_vectors()
    
    def _build_item_vectors(self):
        """构建物品向量"""
        print("构建物品向量...")
        texts = [doc['content'] for doc in self.documents]
        
        try:
            self.item_vectors = self.vectorizer.sentence_transformer_vectorize(texts)
        except:
            tfidf_vectors = self.vectorizer.tfidf_vectorize(texts)
            self.item_vectors = normalize(tfidf_vectors.toarray().astype('float32'))
        
        print(f"物品向量维度: {self.item_vectors.shape}")
    
    def create_user_profile(self, user_id, liked_items, disliked_items=None):
        """创建用户画像"""
        liked_vectors = [self.item_vectors[i] for i in liked_items if i < len(self.item_vectors)]
        
        if not liked_vectors:
            return None
        
        # 用户画像为喜欢物品的平均向量
        user_vector = np.mean(liked_vectors, axis=0)
        
        # 如果有不喜欢的物品，从用户画像中减去
        if disliked_items:
            disliked_vectors = [self.item_vectors[i] for i in disliked_items if i < len(self.item_vectors)]
            if disliked_vectors:
                disliked_mean = np.mean(disliked_vectors, axis=0)
                user_vector = user_vector - 0.3 * disliked_mean  # 负反馈权重较小
        
        # 归一化
        user_vector = user_vector / np.linalg.norm(user_vector)
        
        self.user_profiles[user_id] = {
            'vector': user_vector,
            'liked_items': liked_items,
            'disliked_items': disliked_items or []
        }
        
        return user_vector
    
    def recommend_items(self, user_id, top_k=5, exclude_seen=True):
        """为用户推荐物品"""
        if user_id not in self.user_profiles:
            return []
        
        user_vector = self.user_profiles[user_id]['vector']
        liked_items = set(self.user_profiles[user_id]['liked_items'])
        disliked_items = set(self.user_profiles[user_id]['disliked_items'])
        
        # 计算与所有物品的相似度
        similarities = cosine_similarity([user_vector], self.item_vectors)[0]
        
        # 创建推荐列表
        recommendations = []
        for i, sim in enumerate(similarities):
            # 排除已经交互过的物品
            if exclude_seen and (i in liked_items or i in disliked_items):
                continue
            
            recommendations.append({
                'item_id': i,
                'document': self.documents[i],
                'similarity': sim,
                'score': sim
            })
        
        # 按相似度排序
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        
        return recommendations[:top_k]
    
    def explain_recommendation(self, user_id, item_id):
        """解释推荐原因"""
        if user_id not in self.user_profiles:
            return "用户画像不存在"
        
        user_profile = self.user_profiles[user_id]
        user_vector = user_profile['vector']
        item_vector = self.item_vectors[item_id]
        
        similarity = cosine_similarity([user_vector], [item_vector])[0][0]
        
        # 找到与推荐物品最相似的用户喜欢的物品
        liked_similarities = []
        for liked_id in user_profile['liked_items']:
            liked_vector = self.item_vectors[liked_id]
            liked_sim = cosine_similarity([item_vector], [liked_vector])[0][0]
            liked_similarities.append((liked_id, liked_sim))
        
        liked_similarities.sort(key=lambda x: x[1], reverse=True)
        most_similar_liked = liked_similarities[0] if liked_similarities else None
        
        explanation = f"推荐评分: {similarity:.3f}\n"
        if most_similar_liked:
            similar_doc = self.documents[most_similar_liked[0]]
            explanation += f"因为与您喜欢的'{similar_doc['title']}'相似 (相似度: {most_similar_liked[1]:.3f})"
        
        return explanation

# 初始化推荐系统
rec_system = ContentRecommendationSystem(documents)
print("推荐系统初始化完成")

In [None]:
# 模拟用户行为数据
print("创建模拟用户...")

# 用户1: 喜欢科技类文档
tech_docs = [i for i, doc in enumerate(documents) if doc['category'] == '科技']
user1_liked = tech_docs[:2] if len(tech_docs) >= 2 else tech_docs
rec_system.create_user_profile('user1', user1_liked)

# 用户2: 喜欢体育类文档
sports_docs = [i for i, doc in enumerate(documents) if doc['category'] == '体育']
user2_liked = sports_docs[:2] if len(sports_docs) >= 2 else sports_docs
rec_system.create_user_profile('user2', user2_liked)

# 用户3: 混合偏好
mixed_docs = tech_docs[:1] + sports_docs[:1]
rec_system.create_user_profile('user3', mixed_docs)

print(f"用户1 喜欢的文档: {[documents[i]['title'] for i in user1_liked]}")
print(f"用户2 喜欢的文档: {[documents[i]['title'] for i in user2_liked]}")
print(f"用户3 喜欢的文档: {[documents[i]['title'] for i in mixed_docs]}")

In [None]:
# 为用户生成推荐
def display_recommendations(user_id, recommendations):
    """显示推荐结果"""
    print(f"\n为 {user_id} 的推荐:")
    print("="*50)
    for i, rec in enumerate(recommendations):
        doc = rec['document']
        print(f"{i+1}. {doc['title']} (评分: {rec['score']:.3f})")
        print(f"   类别: {doc['category']}")
        print(f"   内容: {doc['content'][:60]}...")
        
        # 显示推荐解释
        explanation = rec_system.explain_recommendation(user_id, rec['item_id'])
        print(f"   推荐原因: {explanation.split('因为')[1] if '因为' in explanation else '基于用户偏好'}")
        print()

# 为每个用户生成推荐
for user_id in ['user1', 'user2', 'user3']:
    recommendations = rec_system.recommend_items(user_id, top_k=3)
    display_recommendations(user_id, recommendations)

## 3. 语义相似度分析

分析文档之间的语义关系和聚类。

In [None]:
# 计算文档间的相似度矩阵
print("计算文档相似度矩阵...")
similarity_matrix = cosine_similarity(rec_system.item_vectors)

# 可视化相似度矩阵
plt.figure(figsize=(12, 10))
mask = np.triu(np.ones_like(similarity_matrix, dtype=bool))
sns.heatmap(similarity_matrix, mask=mask, annot=False, cmap='YlOrRd', 
            square=True, linewidths=0.5, cbar_kws={"shrink": .8})
plt.title('文档相似度矩阵')
plt.xlabel('文档索引')
plt.ylabel('文档索引')
plt.show()

# 找出最相似的文档对
print("\n最相似的文档对:")
print("="*40)

# 去除对角线和重复的对
n_docs = len(documents)
similar_pairs = []
for i in range(n_docs):
    for j in range(i+1, n_docs):
        similar_pairs.append((i, j, similarity_matrix[i, j]))

# 排序并显示前5对
similar_pairs.sort(key=lambda x: x[2], reverse=True)
for i, (doc1_idx, doc2_idx, sim) in enumerate(similar_pairs[:5]):
    doc1 = documents[doc1_idx]
    doc2 = documents[doc2_idx]
    print(f"{i+1}. 相似度: {sim:.3f}")
    print(f"   文档1: {doc1['title']} [{doc1['category']}]")
    print(f"   文档2: {doc2['title']} [{doc2['category']}]")
    print()

In [None]:
# 类别内部和类别间的相似度分析
def analyze_category_similarities():
    """分析类别间的相似度"""
    categories = [doc['category'] for doc in documents]
    unique_categories = list(set(categories))
    
    intra_category_sims = defaultdict(list)  # 类别内相似度
    inter_category_sims = defaultdict(list)  # 类别间相似度
    
    for i in range(len(documents)):
        for j in range(i+1, len(documents)):
            sim = similarity_matrix[i, j]
            cat1, cat2 = categories[i], categories[j]
            
            if cat1 == cat2:
                intra_category_sims[cat1].append(sim)
            else:
                pair_key = tuple(sorted([cat1, cat2]))
                inter_category_sims[pair_key].append(sim)
    
    # 显示类别内相似度
    print("类别内平均相似度:")
    print("-" * 30)
    for cat in unique_categories:
        if cat in intra_category_sims:
            avg_sim = np.mean(intra_category_sims[cat])
            print(f"{cat}: {avg_sim:.3f} (样本数: {len(intra_category_sims[cat])})")
    
    # 显示类别间相似度
    print("\n类别间平均相似度:")
    print("-" * 30)
    for pair, sims in inter_category_sims.items():
        avg_sim = np.mean(sims)
        print(f"{pair[0]} vs {pair[1]}: {avg_sim:.3f} (样本数: {len(sims)})")
    
    return intra_category_sims, inter_category_sims

intra_sims, inter_sims = analyze_category_similarities()

## 4. 搜索系统评估

评估不同搜索方法的效果。

In [None]:
class SearchEvaluator:
    """搜索系统评估器"""
    
    def __init__(self, search_system, documents):
        self.search_system = search_system
        self.documents = documents
    
    def create_test_queries(self):
        """创建测试查询及其相关文档"""
        # 基于文档类别创建查询和期望结果
        test_cases = [
            {
                'query': '人工智能机器学习',
                'relevant_categories': ['科技'],
                'relevant_keywords': ['人工智能', '机器学习', '算法', '技术']
            },
            {
                'query': '体育运动健身',
                'relevant_categories': ['体育'],
                'relevant_keywords': ['体育', '运动', '健身', '锻炼']
            },
            {
                'query': '艺术创作文化',
                'relevant_categories': ['艺术'],
                'relevant_keywords': ['艺术', '创作', '文化', '音乐']
            }
        ]
        
        # 为每个测试用例找到相关文档
        for test_case in test_cases:
            relevant_docs = []
            for i, doc in enumerate(self.documents):
                # 基于类别匹配
                if doc['category'] in test_case['relevant_categories']:
                    relevant_docs.append(i)
                # 基于关键词匹配
                else:
                    content_lower = doc['content'].lower()
                    if any(keyword in content_lower for keyword in test_case['relevant_keywords']):
                        relevant_docs.append(i)
            
            test_case['relevant_docs'] = relevant_docs
        
        return test_cases
    
    def calculate_metrics(self, retrieved_docs, relevant_docs, k=5):
        """计算评估指标"""
        retrieved_set = set(retrieved_docs[:k])
        relevant_set = set(relevant_docs)
        
        # 计算交集
        intersection = retrieved_set.intersection(relevant_set)
        
        # Precision@K: 检索到的文档中相关文档的比例
        precision = len(intersection) / len(retrieved_set) if retrieved_set else 0
        
        # Recall@K: 相关文档中被检索到的比例
        recall = len(intersection) / len(relevant_set) if relevant_set else 0
        
        # F1-Score
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'relevant_retrieved': len(intersection)
        }
    
    def evaluate_search_methods(self, test_cases, k=5):
        """评估不同搜索方法"""
        methods = {
            '关键词搜索': self.search_system.keyword_search,
            '语义搜索': self.search_system.semantic_search,
            '混合搜索': self.search_system.hybrid_search
        }
        
        results = {}
        
        for method_name, search_func in methods.items():
            method_metrics = []
            
            for test_case in test_cases:
                query = test_case['query']
                relevant_docs = test_case['relevant_docs']
                
                # 执行搜索
                search_results = search_func(query, top_k=k)
                retrieved_docs = []
                
                for result in search_results:
                    # 找到文档在原始列表中的索引
                    doc_id = result['document']['id']
                    doc_idx = next((i for i, d in enumerate(self.documents) if d['id'] == doc_id), -1)
                    if doc_idx != -1:
                        retrieved_docs.append(doc_idx)
                
                # 计算指标
                metrics = self.calculate_metrics(retrieved_docs, relevant_docs, k)
                metrics['query'] = query
                method_metrics.append(metrics)
            
            results[method_name] = method_metrics
        
        return results

# 初始化评估器
evaluator = SearchEvaluator(search_system, documents)
test_cases = evaluator.create_test_queries()

print("创建的测试用例:")
for i, test_case in enumerate(test_cases):
    print(f"{i+1}. 查询: '{test_case['query']}'")
    print(f"   相关文档数: {len(test_case['relevant_docs'])}")
    print(f"   相关类别: {test_case['relevant_categories']}")
    print()

In [None]:
# 执行评估
print("执行搜索系统评估...")
evaluation_results = evaluator.evaluate_search_methods(test_cases, k=5)

# 显示评估结果
print("\n评估结果:")
print("="*60)

for method_name, method_results in evaluation_results.items():
    print(f"\n{method_name}:")
    print("-" * 40)
    
    all_precision = [r['precision'] for r in method_results]
    all_recall = [r['recall'] for r in method_results]
    all_f1 = [r['f1'] for r in method_results]
    
    print(f"平均 Precision@5: {np.mean(all_precision):.3f}")
    print(f"平均 Recall@5: {np.mean(all_recall):.3f}")
    print(f"平均 F1-Score: {np.mean(all_f1):.3f}")
    
    # 详细结果
    for result in method_results:
        print(f"  查询: '{result['query']}' - P:{result['precision']:.2f} R:{result['recall']:.2f} F1:{result['f1']:.2f}")

In [None]:
# 可视化评估结果
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
metrics = ['precision', 'recall', 'f1']
metric_names = ['Precision@5', 'Recall@5', 'F1-Score']

for i, (metric, metric_name) in enumerate(zip(metrics, metric_names)):
    method_names = list(evaluation_results.keys())
    method_scores = []
    
    for method_name in method_names:
        scores = [r[metric] for r in evaluation_results[method_name]]
        method_scores.append(np.mean(scores))
    
    bars = axes[i].bar(method_names, method_scores, 
                      color=['blue', 'orange', 'green'], alpha=0.7)
    axes[i].set_title(f'{metric_name} 比较')
    axes[i].set_ylabel(metric_name)
    axes[i].set_ylim(0, 1)
    
    # 添加数值标签
    for bar, score in zip(bars, method_scores):
        axes[i].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                    f'{score:.3f}', ha='center', va='bottom')
    
    axes[i].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 5. 性能优化实验

测试不同参数和方法对性能的影响。

In [None]:
# 测试混合搜索中不同alpha值的影响
print("测试混合搜索参数优化...")

alpha_values = [0.1, 0.3, 0.5, 0.7, 0.9]
alpha_results = []

test_query = test_cases[0]['query']
relevant_docs = test_cases[0]['relevant_docs']

for alpha in alpha_values:
    # 修改混合搜索的alpha参数
    hybrid_results = search_system.hybrid_search(test_query, top_k=5, alpha=alpha)
    
    # 提取文档索引
    retrieved_docs = []
    for result in hybrid_results:
        doc_id = result['document']['id']
        doc_idx = next((i for i, d in enumerate(documents) if d['id'] == doc_id), -1)
        if doc_idx != -1:
            retrieved_docs.append(doc_idx)
    
    # 计算指标
    metrics = evaluator.calculate_metrics(retrieved_docs, relevant_docs, 5)
    metrics['alpha'] = alpha
    alpha_results.append(metrics)

# 可视化alpha值的影响
alphas = [r['alpha'] for r in alpha_results]
precisions = [r['precision'] for r in alpha_results]
recalls = [r['recall'] for r in alpha_results]
f1s = [r['f1'] for r in alpha_results]

plt.figure(figsize=(10, 6))
plt.plot(alphas, precisions, 'o-', label='Precision', linewidth=2)
plt.plot(alphas, recalls, 's-', label='Recall', linewidth=2)
plt.plot(alphas, f1s, '^-', label='F1-Score', linewidth=2)
plt.xlabel('Alpha值 (关键词搜索权重)')
plt.ylabel('评估指标')
plt.title('混合搜索Alpha参数对性能的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 找到最佳alpha值
best_alpha_idx = np.argmax(f1s)
best_alpha = alphas[best_alpha_idx]
print(f"\n最佳Alpha值: {best_alpha} (F1-Score: {f1s[best_alpha_idx]:.3f})")

## 6. 总结和应用建议

基于实验结果的应用建议。

In [None]:
print("向量搜索应用总结:")
print("="*50)

# 分析评估结果
method_performance = {}
for method_name, method_results in evaluation_results.items():
    avg_f1 = np.mean([r['f1'] for r in method_results])
    method_performance[method_name] = avg_f1

best_method = max(method_performance, key=method_performance.get)
print(f"\n最佳搜索方法: {best_method} (平均F1: {method_performance[best_method]:.3f})")

print("\n应用场景建议:")
print("-" * 30)

scenarios = {
    "关键词精确匹配": {
        "推荐方法": "TF-IDF + 关键词搜索",
        "适用场景": "法律文档检索、产品规格查询",
        "优点": "快速、精确匹配关键词",
        "缺点": "无法理解语义关系"
    },
    "语义理解搜索": {
        "推荐方法": "Sentence Transformers + FAISS",
        "适用场景": "智能客服、知识问答",
        "优点": "理解语义，召回相关内容",
        "缺点": "计算资源需求大"
    },
    "综合搜索系统": {
        "推荐方法": "混合搜索 (最佳Alpha值)",
        "适用场景": "通用搜索引擎、内容推荐",
        "优点": "平衡精确性和语义理解",
        "缺点": "参数调优复杂"
    },
    "大规模推荐": {
        "推荐方法": "FAISS + 用户画像",
        "适用场景": "电商推荐、内容推荐",
        "优点": "可扩展、个性化",
        "缺点": "冷启动问题"
    }
}

for scenario, details in scenarios.items():
    print(f"\n{scenario}:")
    for key, value in details.items():
        print(f"  {key}: {value}")

print("\n\n技术选型决策树:")
print("-" * 40)
decision_tree = [
    "1. 数据规模 < 10K？ → 使用基础向量搜索",
    "2. 需要精确关键词匹配？ → 使用TF-IDF",
    "3. 需要语义理解？ → 使用Sentence Transformers",
    "4. 实时性要求高？ → 使用FAISS加速",
    "5. 个性化需求？ → 构建用户画像系统",
    "6. 效果要求最高？ → 使用混合搜索并调优参数"
]

for decision in decision_tree:
    print(f"  {decision}")

print("\n实验完成！向量搜索技术已成功应用于多个实际场景。")