## 以markdown格式的文本为例，分割文本块和构建faiss索引

### 文本分块，每个不超过max_tokens，支持自定义重叠

In [47]:
import re
import os
import json
import markdown2 
from bs4 import BeautifulSoup
from typing import List, Dict, Any
import requests
import numpy as np

def detect_language(text: str) -> str:
    """自动检测文本主要语言（中文/英文）"""
    chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
    english_chars = len(re.findall(r'[a-zA-Z]', text))
    return 'zh' if chinese_chars > english_chars else 'en'

def split_text_into_chunks(
        text: str,
        max_tokens: int = 500,
        overlap: int = 50,
        language: str = None
) -> List[str]:
    """将长文本切分为多个chunk，保留单词完整性并支持重叠

    Args:
        text: 输入文本
        max_tokens: 每个chunk的最大长度
        overlap: 相邻chunk之间的重叠长度
        language: 可选强制指定语言('zh'/'en')，默认自动检测

    Returns:
        切分后的chunk列表
    """
    # 清理文本并检测语言
    cleaned = re.sub(r'\s+', ' ', text).strip()
    if not cleaned:
        return []

    lang = language or detect_language(cleaned)

    # 中英文不同的分割逻辑
    if lang == 'zh':
        # 中文按句子分割（保留标点）
        sentences = [s for s in re.split(r'(?<=[。！？.!?])', cleaned) if s]
    else:
        # 英文按句子分割（保留单词完整性）
        sentences = [s for s in re.split(r'(?<=[.!?])\s+', cleaned) if s]

    chunks = []
    current_chunk = ""

    for sentence in sentences:
        # 处理超长句子（超过max_tokens）
        if len(sentence) > max_tokens:
            if current_chunk:
                chunks.append(current_chunk)
                current_chunk = ""

            if lang == 'zh':
                # 中文按字符分割（保留重叠）
                chunks.extend([sentence[i:i + max_tokens]
                               for i in range(0, len(sentence), max_tokens - overlap)])
            else:
                # 英文按单词分割（保留单词完整性）
                words = sentence.split()
                current_words = []
                for word in words:
                    if len(' '.join(current_words + [word])) <= max_tokens:
                        current_words.append(word)
                    else:
                        chunks.append(' '.join(current_words))
                        # 保留重叠部分（从尾部取单词）
                        overlap_words = current_words[-overlap:] if overlap < len(current_words) else current_words
                        current_words = overlap_words + [word]
                if current_words:
                    chunks.append(' '.join(current_words))
            continue

        # 正常句子处理
        if len(current_chunk) + len(sentence) > max_tokens:
            if chunks:
                # 计算实际重叠长度（不超过剩余空间）
                effective_overlap = min(
                    overlap,
                    len(current_chunk),
                    max_tokens - len(sentence)
                )
                overlap_part = current_chunk[-effective_overlap:]
                chunks.append(current_chunk)
                current_chunk = overlap_part + sentence
            else:
                # 第一个chunk直接超限
                chunks.append(current_chunk)
                current_chunk = sentence
        else:
            current_chunk += sentence

    if current_chunk:
        chunks.append(current_chunk)

    # 后处理：确保所有chunk都不超限
    final_chunks = []
    for chunk in chunks:
        if len(chunk) > max_tokens:
            if lang == 'zh':
                final_chunks.extend([chunk[i:i + max_tokens]
                                     for i in range(0, len(chunk), max_tokens - overlap)])
            else:
                words = chunk.split()
                current_words = []
                for word in words:
                    if len(' '.join(current_words + [word])) <= max_tokens:
                        current_words.append(word)
                    else:
                        final_chunks.append(' '.join(current_words))
                        current_words = [word]
                if current_words:
                    final_chunks.append(' '.join(current_words))
        else:
            final_chunks.append(chunk)

    return final_chunks

### 提取MD文件中的文本内容

In [48]:
def extract_text_from_md(file_path: str) -> str:
    """提取MD文件中的文本内容"""
    with open(file_path, 'r', encoding='utf-8') as f:
        md_content = f.read()
    # 将MD转换为HTML
    html = markdown2.markdown(md_content)
    # 从HTML中提取纯文本
    soup = BeautifulSoup(html, 'html.parser')
    return soup.get_text()


### 将文本块保存为JSON文件

In [49]:
def save_chunks_to_json(chunks: List[str], output_path: str) -> None:
    """将文本块保存为JSON文件"""
    data = [{"id": i, "text": chunk, "length": len(chunk)} for i, chunk in enumerate(chunks)]
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

### 使用本地embedding模型生成文本嵌入向量，支持分批处理

In [50]:
def generate_embeddings(texts: List[str], model: str = "dengcao/Qwen3-Embedding-8B:Q5_K_M", batch_size: int = 10) -> List[List[float]]:
    """
    使用本地Ollama模型生成文本嵌入向量
    
    参数:
        texts (List[str]): 文本块列表
        model (str): 本地部署的Ollama模型名称
        batch_size (int): 单次批量处理数量
        
    返回:
        List[List[float]]: 文本嵌入向量列表
    """
    all_embeddings = []
    url = "http://localhost:11434/api/embeddings"  # Ollama默认API地址
    
    # 分批处理文本
    total_batches = (len(texts) + batch_size - 1) // batch_size
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        batch_num = (i // batch_size) + 1
        print(f"正在处理第 {batch_num}/{total_batches} 批，包含 {len(batch)} 个文本块")
        
        batch_embeddings = []
        for idx, text in enumerate(batch):
            try:
                # 构造Ollama API请求
                payload = {
                    "model": model,
                    "prompt": text,
                    "options": {"temperature": 0.0}  # 确保确定性结果
                }
                
                # 发送请求到本地Ollama服务
                response = requests.post(url, json=payload, timeout=60)
                response.raise_for_status()  # 检查HTTP错误
                
                # 解析响应并获取嵌入向量
                embedding_data = response.json()
                embedding = embedding_data.get("embedding")
                
                if embedding and isinstance(embedding, list):
                    batch_embeddings.append(embedding)
                    print(f"✓ 文本块 {idx+1}/{len(batch)} 完成")
                else:
                    raise ValueError("API响应格式错误，未获取到有效嵌入向量")
                    
            except requests.exceptions.RequestException as e:
                print(f"✗ 文本块 {idx+1} 请求失败: {e}")
                batch_embeddings.append([])  # 添加空向量占位
            except ValueError as ve:
                print(f"✗ 文本块 {idx+1} 解析失败: {ve}")
                batch_embeddings.append([])  # 添加空向量占位
            except Exception as e:
                print(f"✗ 文本块 {idx+1} 未知错误: {e}")
                batch_embeddings.append([])  # 添加空向量占位
                
        # 过滤无效嵌入向量
        valid_embeddings = [emb for emb in batch_embeddings if emb]
        
        # 验证嵌入向量维度一致性
        if valid_embeddings:
            ref_dim = len(valid_embeddings[0])
            for emb in valid_embeddings:
                if len(emb) != ref_dim:
                    print(f"警告! 嵌入向量维度不一致: {len(emb)} vs {ref_dim}")
                    valid_embeddings = []  # 清除非一致批次
                    break
                
        # 添加有效嵌入向量到结果集
        if valid_embeddings:
            all_embeddings.extend(valid_embeddings)
            print(f"第 {batch_num} 批处理完成，获取 {len(valid_embeddings)} 个有效嵌入向量")
        else:
            print(f"✗ 第 {batch_num} 批未获取到有效嵌入向量")
            
        # 添加空向量保持原始顺序
        all_embeddings.extend([[]] * (len(batch_embeddings) - len(valid_embeddings)))
        
    # 最终维度验证
    embedding_dims = {len(emb) for emb in all_embeddings if emb}
    if len(embedding_dims) > 1:
        print(f"警告! 存在不一致的嵌入维度: {embedding_dims}")
    elif embedding_dims:
        print(f"所有嵌入向量维度一致: {next(iter(embedding_dims))}")
    
    return all_embeddings


### 将文本块及其向量保存为JSON文件

In [51]:
def save_embeddings_to_json(chunks: List[Dict[str, Any]], embeddings: List[List[float]], output_path: str) -> None:
    if len(chunks) != len(embeddings):
        raise ValueError("文本块数量与向量数量不匹配")
    
    # 将向量添加到元数据中
    for i, chunk in enumerate(chunks):
        chunk["embedding"] = embeddings[i]
    
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(chunks, f, ensure_ascii=False, indent=2)

### 创建并保存FAISS索引

In [52]:
def create_faiss_index(embeddings: List[List[float]], index_path: str) -> None:
    import faiss
    
    # 将Python列表转换为NumPy数组
    vectors = np.array(embeddings, dtype='float32')
    
    # 创建FAISS索引（使用FlatL2进行精确搜索）
    dimension = len(embeddings[0])
    index = faiss.IndexFlatL2(dimension)
    
    # 添加向量到索引
    index.add(vectors)
    
    # 保存索引
    faiss.write_index(index, index_path)
    print(f"FAISS索引已保存到: {index_path}")

### 处理单个文件

In [53]:
def process_single_file(file_path: str, output_dir: str) -> None:
    file_ext = os.path.splitext(file_path)[1].lower()
    file_name = os.path.basename(file_path)
    base_name = os.path.splitext(file_name)[0]
    
    # 确保输出目录存在
    os.makedirs(output_dir, exist_ok=True)
    
    # 提取文本
    if file_ext == '.md':
        text = extract_text_from_md(file_path)
    else:
        print(f"不支持的文件类型: {file_ext}")
        return
    
    # 分割文本
    chunks = split_text_into_chunks(text, max_tokens=400, overlap=50)
    
    # 保存文本块
    output_json_path = os.path.join(output_dir, f"{base_name}_chunks.json")
    save_chunks_to_json(chunks, output_json_path)
    print(f"文件 {file_name} 共分成 {len(chunks)} 段")
    print(f"文本块已保存到 {output_json_path}")
    
    # 生成向量
    print(f"正在使用embedding模型为 {file_name} 生成向量...")
    embeddings = generate_embeddings(chunks)
    
    if embeddings:
        # 转换为字典列表，添加id和length信息
        chunks_with_metadata = [{"id": i, "text": chunk, "length": len(chunk), "source_file": file_name} for i, chunk in enumerate(chunks)]
        
        # 保存带向量的JSON文件
        vector_json_path = os.path.join(output_dir, f"{base_name}_vectors.json")
        save_embeddings_to_json(chunks_with_metadata, embeddings, vector_json_path)
        print(f"向量已保存到 {vector_json_path}")
        
        # 创建并保存FAISS索引
        faiss_index_path = os.path.join(output_dir, f"{base_name}.index")
        create_faiss_index(embeddings, faiss_index_path)
    else:
        print(f"未能为 {file_name} 生成向量")

### 批量处理目录中的文件

In [54]:
def batch_process_files(input_dir: str, output_dir: str, file_extensions: List[str] = ['.pdf', '.md']) -> None:
    # 获取所有符合条件的文件
    files_to_process = []
    for root, _, files in os.walk(input_dir):
        for file in files:
            if any(file.lower().endswith(ext) for ext in file_extensions):
                files_to_process.append(os.path.join(root, file))
    
    if not files_to_process:
        print(f"在 {input_dir} 中没有找到符合条件的文件")
        return
    
    print(f"找到 {len(files_to_process)} 个文件需要处理")
    
    # 逐个处理文件
    for i, file_path in enumerate(files_to_process):
        print(f"\n===== 正在处理文件 {i+1}/{len(files_to_process)}: {os.path.basename(file_path)} =====")
        process_single_file(file_path, output_dir)

### 主程序入口

In [55]:
def main():
    # 配置参数
    input_dir = "./datas/input_files"  # 输入目录，包含PDF和MD文件
    output_dir = "./datas/output"  # 输出目录
    
    # 批量处理文件
    batch_process_files(input_dir, output_dir)

### 主程序入口

In [56]:

if __name__ == "__main__":
    main()

找到 4 个文件需要处理

===== 正在处理文件 1/4: 中国文化通史明代卷2.md =====
文件 中国文化通史明代卷2.md 共分成 442 段
文本块已保存到 ./datas/output/中国文化通史明代卷2_chunks.json
正在使用embedding模型为 中国文化通史明代卷2.md 生成向量...
正在处理第 1/45 批，包含 10 个文本块
✓ 文本块 1/10 完成
✓ 文本块 2/10 完成
✓ 文本块 3/10 完成
✓ 文本块 4/10 完成
✓ 文本块 5/10 完成
✓ 文本块 6/10 完成
✓ 文本块 7/10 完成
✓ 文本块 8/10 完成
✓ 文本块 9/10 完成
✓ 文本块 10/10 完成
第 1 批处理完成，获取 10 个有效嵌入向量
正在处理第 2/45 批，包含 10 个文本块
✓ 文本块 1/10 完成
✓ 文本块 2/10 完成
✓ 文本块 3/10 完成
✓ 文本块 4/10 完成
✓ 文本块 5/10 完成
✓ 文本块 6/10 完成
✓ 文本块 7/10 完成
✓ 文本块 8/10 完成
✓ 文本块 9/10 完成
✓ 文本块 10/10 完成
第 2 批处理完成，获取 10 个有效嵌入向量
正在处理第 3/45 批，包含 10 个文本块
✓ 文本块 1/10 完成
✓ 文本块 2/10 完成
✓ 文本块 3/10 完成
✓ 文本块 4/10 完成
✓ 文本块 5/10 完成
✓ 文本块 6/10 完成
✓ 文本块 7/10 完成
✓ 文本块 8/10 完成
✓ 文本块 9/10 完成
✓ 文本块 10/10 完成
第 3 批处理完成，获取 10 个有效嵌入向量
正在处理第 4/45 批，包含 10 个文本块
✓ 文本块 1/10 完成
✓ 文本块 2/10 完成
✓ 文本块 3/10 完成
✓ 文本块 4/10 完成
✓ 文本块 5/10 完成
✓ 文本块 6/10 完成
✓ 文本块 7/10 完成
✓ 文本块 8/10 完成
✓ 文本块 9/10 完成
✓ 文本块 10/10 完成
第 4 批处理完成，获取 10 个有效嵌入向量
正在处理第 5/45 批，包含 10 个文本块
✓ 文本块 1/10 完成
✓ 文本块 2/10 完成
✓ 文本块 3/10 完成
✓ 文本块 4/10 完成
