<a href="https://colab.research.google.com/github/ivytas0905/Alibaba_financial_report/blob/main/financial_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install -U langchain-openai pdfplumber numpy pypdf python-docx faiss-cpu sentence-transformers -U langchain-community python-dotenv

Collecting pypdf
  Downloading pypdf-5.9.0-py3-none-any.whl.metadata (7.1 kB)
Downloading pypdf-5.9.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.2/313.2 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pypdf
Successfully installed pypdf-5.9.0


In [13]:
import pdfplumber
import re
import json

from google.colab import drive
drive.mount('/content/drive')

pdf_path_en = "/content/drive/MyDrive/alibaba_en.pdf"
pdf_path_cn = "/content/drive/MyDrive/alibaba_cn.pdf"


data = {
    "financial_statements": {
        "annual": {
            "FY2024": {
                "metrics": {},
                "special_items": {}
            },
            "FY2025": {
                "metrics": {},
                "special_items": {}
            }
        },
        "quarterly": {}
    },
    "business_segments": {
        "FY2025": {}
    },
    "risk_factors": [],
    "metadata": {
        "currency": "RMB Mn",
        "reporting_period": "Fiscal year ending March 31"
    }
 }

TABLE_PATTERNS = {
    "revenue": r"Revenue",
    "operating_income": r"Income from operations",
    "net_income": r"Net income",
    "adjusted_EBITA": r"Adjusted EBITA"
}
#财务数据分配模式

financial_pattern = {
    "total_revenue": r"Revenue[^\d]*([\d,.]+)\s*(?:billion|B|USD|RMB)",
    "net_profit": r"Net\s*income[^\d]*([\d,.]+)\s*(?:billion|B|USD|RMB)",
    "operating_cash_flow": r"Net\s*cash\s*provided\s*by\s*operating\s*activities[^\d]*([\d,.]+)\s*(?:million|M|RMB|USD)",
    "free_cash_flow": r"Free\s*cash\s*flow[^\d]*([\d,.]+)\s*(?:million|M|RMB|USD)",
    "adjusted_EBITA": r"Adjusted\s*EBITA[^\d]*([\d,.]+)\s*(?:billion|B|USD|RMB)"
}
#分布数据匹配

segment_patterns = {
    "Taobao_Tmall": r"Taobao and Tmall Group.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)",
    "AIDC": r"Alibaba International Digital Commerce Group.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)",
    "Cloud_Intelligence": r"Cloud Intelligence Group.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)",
    "Cainiao": r"Cainiao Smart Logistics.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)",
    "Local_Services": r"Local Services Group.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)",
    "Digital_Media": r"Digital Media and Entertainment Group.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)",
    "All_Others": r"All Others.*?Revenue.*?([\d,.]+).*?Adjusted EBITA.*?([\d,.\-]+)"
}

risk_keywords = ["宏观经济 风险", "市场竞争 风险", "地缘政治", "数据安全", "网络安全", "法规监管", "合规", "监管风险"]

def extract_financial_tables(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            tables = page.extract_tables()
            for table in tables:
                if len(table) > 5 and any(patt in str(table[2][0]) for patt in TABLE_PATTERNS.values()):
                    process_financial_table(table)

def process_financial_table(table):
    # 确定列索引
    col_map = {}
    for i, header in enumerate(table[0]):
        if "2024" in header: col_map["FY2024"] = i
        if "2025" in header: col_map["FY2025"] = i

    # 提取数据
    for row in table[2:]:
        if not row[0]: continue

        for metric, pattern in TABLE_PATTERNS.items():
            if re.search(pattern, row[0]):
                for year, col in col_map.items():
                    if row[col] and row[col].strip() not in ("-", ""):
                        value = float(row[col].replace(",", ""))
                        data["financial_statements"]["annual"][year]["metrics"][metric] = value

# 原有文本提取函数（优化版）
def extract_text_data(pdf_path_en, pdf_path_cn):
    # 英文财务数据
    with pdfplumber.open(pdf_path_en) as pdf:
        full_text_en = "\n".join([p.extract_text(x_tolerance=2, y_tolerance=2)
                                for p in pdf.pages if p.extract_text()])

        # 补充提取表格未覆盖的数据
        for year in ["FY2024", "FY2025"]:
            year_text = re.search(f"{year}.*?(?=(FY\d|$))", full_text_en, re.DOTALL)
            if year_text:
                for key, pattern in financial_pattern.items():
                    match = re.search(pattern, year_text.group())
                    if match:
                        value = match.group(1).replace(",", "")
                        data["financial_statements"]["annual"][year]["metrics"][key] = float(value)

        # 业务分部数据
        for seg, pattern in segment_patterns.items():
            match = re.search(pattern, full_text_en, re.DOTALL)
            if match:
                data["business_segments"]["FY2025"][seg] = {
                    "revenue": match.group(1).replace(",", ""),
                    "adjusted_EBITA": match.group(2).replace(",", "")
                }

    # 中文风险因素
    with pdfplumber.open(pdf_path_cn) as pdf:
        full_text_cn = "\n".join([p.extract_text() for p in pdf.pages if p.extract_text()])
        sentences = re.split(r'(?<=[。！？；;])', full_text_cn)
        data["risk_factors"] = list(set(
            sent.strip() for sent in sentences
            if any(kw in sent for kw in risk_keywords) and len(sent) > 8
        ))

# 主执行流程
def main():
    # 优先提取表格数据
    extract_financial_tables(pdf_path_en)

    # 补充文本提取
    extract_text_data(pdf_path_en, pdf_path_cn)

    # 保存结果
    json_path = "/content/alibaba_financial_data.json"
    with open(json_path, 'w', encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

    # 下载文件
    from google.colab import files
    files.download(json_path)
    print("JSON文件已生成并下载")

if __name__ == "__main__":
    main()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

JSON文件已生成并下载


In [17]:
import numpy as np
import faiss
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
from openai import OpenAI
import pickle
import os
from datetime import datetime

@dataclass
class TextChunk:
    """文本块数据结构"""
    id: str
    text: str
    source: str  # 来源（如哪个财报、哪个章节）
    metadata: Dict[str, Any]
    embedding: np.ndarray = None
class FinancialReportChunker:
    """财报文本智能切分器"""

    def __init__(self, chunk_size: int = 1000, overlap: int = 200):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def extract_text_from_json(self, json_data: Dict) -> Dict[str, str]:
        """从JSON财报数据中提取所有文本内容"""
        extracted_texts = {}

        def extract_recursive(data, path="root"):
            """递归提取文本内容"""
            if isinstance(data, dict):
                for key, value in data.items():
                    current_path = f"{path}.{key}"
                    if isinstance(value, str) and len(value.strip()) > 10:
                        # 过滤掉过短的文本
                        extracted_texts[current_path] = value.strip()
                    elif isinstance(value, (dict, list)):
                        extract_recursive(value, current_path)
            elif isinstance(data, list):
                for idx, item in enumerate(data):
                    current_path = f"{path}[{idx}]"
                    extract_recursive(item, current_path)

        extract_recursive(json_data)
        return extracted_texts

    def clean_text(self, text: str) -> str:
        """清理文本内容"""
        # 移除多余的空白字符
        text = re.sub(r'\s+', ' ', text)
        # 移除特殊字符但保留中文、英文、数字和基本标点
        text = re.sub(r'[^\u4e00-\u9fff\w\s,.!?;:()（）【】""''、。，！？；：-]', '', text)
        return text.strip()

    def smart_chunk_text(self, text: str, source: str) -> List[TextChunk]:
        """智能文本切分"""
        text = self.clean_text(text)
        chunks = []

        # 按段落分割
        paragraphs = re.split(r'\n\s*\n', text)

        current_chunk = ""
        chunk_id = 0

        for paragraph in paragraphs:
            paragraph = paragraph.strip()
            if not paragraph:
                continue

            # 如果当前块加上新段落不超过限制，则添加
            if len(current_chunk) + len(paragraph) <= self.chunk_size:
                current_chunk += paragraph + "\n\n"
            else:
                # 保存当前块
                if current_chunk.strip():
                    chunks.append(TextChunk(
                        id=f"{source}_chunk_{chunk_id}",
                        text=current_chunk.strip(),
                        source=source,
                        metadata={
                            "chunk_index": chunk_id,
                            "length": len(current_chunk.strip()),
                            "timestamp": datetime.now().isoformat()
                        }
                    ))
                    chunk_id += 1

                # 开始新块，包含重叠内容
                if len(current_chunk) > self.overlap:
                    overlap_text = current_chunk[-self.overlap:]
                    current_chunk = overlap_text + paragraph + "\n\n"
                else:
                    current_chunk = paragraph + "\n\n"

        # 处理最后一个块
        if current_chunk.strip():
            chunks.append(TextChunk(
                id=f"{source}_chunk_{chunk_id}",
                text=current_chunk.strip(),
                source=source,
                metadata={
                    "chunk_index": chunk_id,
                    "length": len(current_chunk.strip()),
                    "timestamp": datetime.now().isoformat()
                }
            ))

        return chunks

    def process_financial_report(self, json_data: Dict, report_name: str) -> List[TextChunk]:
        """处理整个财报JSON数据"""
        extracted_texts = self.extract_text_from_json(json_data)
        all_chunks = []

        for source_path, text in extracted_texts.items():
            chunks = self.smart_chunk_text(text, f"{report_name}_{source_path}")
            all_chunks.extend(chunks)

        print(f"从 {report_name} 中提取了 {len(all_chunks)} 个文本块")
        return all_chunks

class EmbeddingService:
    """文本嵌入服务"""

    def __init__(self, api_key: str, model: str = "text-embedding-ada-002"):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.embedding_dim = 1536  # ada-002的维度

    def get_embeddings(self, texts: List[str], batch_size: int = 100) -> List[np.ndarray]:
        """批量获取文本嵌入"""
        embeddings = []

        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            try:
                response = self.client.embeddings.create(
                    input=batch_texts,
                    model=self.model
                )
                batch_embeddings = [np.array(data.embedding, dtype=np.float32)
                                  for data in response.data]
                embeddings.extend(batch_embeddings)
                print(f"已处理 {min(i + batch_size, len(texts))}/{len(texts)} 个文本")
            except Exception as e:
                print(f"批次 {i//batch_size + 1} 处理失败: {e}")
                # 使用零向量作为fallback
                fallback_embeddings = [np.zeros(self.embedding_dim, dtype=np.float32)
                                     for _ in batch_texts]
                embeddings.extend(fallback_embeddings)

        return embeddings

class FAISSVectorDatabase:
    """FAISS向量数据库"""

    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.index = faiss.IndexFlatIP(dimension)  # 使用内积相似度
        self.chunks: List[TextChunk] = []
        self.chunk_id_to_idx = {}

    def add_chunks(self, chunks: List[TextChunk]):
        """添加文本块到数据库"""
        if not chunks:
            return

        # 确保所有块都有嵌入向量
        embeddings = []
        valid_chunks = []

        for chunk in chunks:
            if chunk.embedding is not None:
                embeddings.append(chunk.embedding)
                valid_chunks.append(chunk)

        if not embeddings:
            print("警告: 没有有效的嵌入向量")
            return

        # 标准化向量（对于内积相似度）
        embeddings_array = np.array(embeddings, dtype=np.float32)
        faiss.normalize_L2(embeddings_array)

        # 添加到FAISS索引
        start_idx = len(self.chunks)
        self.index.add(embeddings_array)

        # 更新元数据
        for i, chunk in enumerate(valid_chunks):
            idx = start_idx + i
            self.chunks.append(chunk)
            self.chunk_id_to_idx[chunk.id] = idx

        print(f"已添加 {len(valid_chunks)} 个向量到数据库，总计 {len(self.chunks)} 个")

    def search(self, query_embedding: np.ndarray, k: int = 5) -> List[Tuple[TextChunk, float]]:
        """语义检索"""
        if len(self.chunks) == 0:
            return []

        # 标准化查询向量
        query_embedding = query_embedding.reshape(1, -1).astype(np.float32)
        faiss.normalize_L2(query_embedding)

        # 执行搜索
        scores, indices = self.index.search(query_embedding, min(k, len(self.chunks)))

        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx >= 0 and idx < len(self.chunks):
                results.append((self.chunks[idx], float(score)))

        return results

    def save(self, filepath: str):
        """保存数据库"""
        # 保存FAISS索引
        faiss.write_index(self.index, f"{filepath}.faiss")

        # 保存元数据
        metadata = {
            'chunks': self.chunks,
            'chunk_id_to_idx': self.chunk_id_to_idx,
            'dimension': self.dimension
        }
        with open(f"{filepath}.pkl", 'wb') as f:
            pickle.dump(metadata, f)

        print(f"数据库已保存到 {filepath}")

    def load(self, filepath: str):
        """加载数据库"""
        # 加载FAISS索引
        self.index = faiss.read_index(f"{filepath}.faiss")

        # 加载元数据
        with open(f"{filepath}.pkl", 'rb') as f:
            metadata = pickle.load(f)

        self.chunks = metadata['chunks']
        self.chunk_id_to_idx = metadata['chunk_id_to_idx']
        self.dimension = metadata['dimension']

        print(f"数据库已从 {filepath} 加载，包含 {len(self.chunks)} 个文本块")

class RAGAnalysisEngine:
    """RAG分析引擎主类"""

    def __init__(self, openai_api_key: str, chunk_size: int = 1000, overlap: int = 200):
        self.chunker = FinancialReportChunker(chunk_size, overlap)
        self.embedding_service = EmbeddingService(openai_api_key)
        self.vector_db = FAISSVectorDatabase()

    def process_financial_reports(self, reports_data: List[Dict], report_names: List[str]):
        """处理多个财报"""
        all_chunks = []

        # 1. 文本切分
        for report_data, report_name in zip(reports_data, report_names):
            chunks = self.chunker.process_financial_report(report_data, report_name)
            all_chunks.extend(chunks)

        # 2. 向量化
        texts = [chunk.text for chunk in all_chunks]
        embeddings = self.embedding_service.get_embeddings(texts)

        # 将嵌入向量分配给对应的文本块
        for chunk, embedding in zip(all_chunks, embeddings):
            chunk.embedding = embedding

        # 3. 构建向量数据库
        self.vector_db.add_chunks(all_chunks)

        return len(all_chunks)

    def semantic_search(self, query: str, k: int = 5) -> List[Tuple[TextChunk, float]]:
        """语义搜索"""
        # 获取查询的嵌入向量
        query_embedding = self.embedding_service.get_embeddings([query])[0]

        # 执行搜索
        results = self.vector_db.search(query_embedding, k)

        return results

    def save_database(self, filepath: str):
        """保存数据库"""
        self.vector_db.save(filepath)

    def load_database(self, filepath: str):
        """加载数据库"""
        self.vector_db.load(filepath)

# 使用示例
def main():
    # 初始化RAG引擎
    api_key = "your-openai-api-key"  # 替换为你的API密钥
    rag_engine = RAGAnalysisEngine(api_key)

    # 示例JSON数据（财报）
    sample_financial_data = [
        {
            "company": "示例公司A",
            "period": "2024Q1",
            "financial_summary": {
                "revenue": "营业收入同比增长15%，达到50亿元人民币。主要增长来源于新产品线的推出和市场份额的扩大。",
                "profit": "净利润为8亿元，同比增长12%。盈利能力持续改善，主要得益于成本控制和运营效率提升。",
                "cash_flow": "经营活动现金流量净额为12亿元，现金流状况良好，为公司持续发展提供了充足的资金支持。"
            },
            "business_analysis": {
                "market_position": "公司在行业中保持领先地位，市场占有率进一步提升至25%。",
                "competitive_advantages": "技术创新能力强，拥有完善的产业链布局和强大的品牌影响力。",
                "risk_factors": "面临原材料价格波动、汇率变化和市场竞争加剧等风险。"
            }
        },
        {
            "company": "示例公司B",
            "period": "2024Q1",
            "financial_summary": {
                "revenue": "本季度营业收入30亿元，同比下降5%。主要受到市场需求疲软和竞争加剧的影响。",
                "profit": "净利润3亿元，同比下降20%。盈利能力面临挑战，需要加强成本管控。",
                "expenses": "销售费用和管理费用占营收比例上升，反映出运营效率有待提升。"
            },
            "strategic_initiatives": {
                "digital_transformation": "加快数字化转型步伐，投资建设智能制造系统和数据分析平台。",
                "market_expansion": "积极拓展海外市场，在东南亚地区新设立3个销售办事处。"
            }
        }
    ]

    report_names = ["CompanyA_2024Q1", "CompanyB_2024Q1"]

    # 处理财报数据
    print("开始处理财报数据...")
    total_chunks = rag_engine.process_financial_reports(sample_financial_data, report_names)
    print(f"总共处理了 {total_chunks} 个文本块")

    # 保存数据库
    rag_engine.save_database("financial_reports_db")

    # 测试语义搜索
    print("\n测试语义搜索...")
    queries = [
        "营业收入增长情况",
        "现金流状况",
        "市场竞争和风险",
        "数字化转型策略"
    ]

    for query in queries:
        print(f"\n查询: {query}")
        results = rag_engine.semantic_search(query, k=3)

        for i, (chunk, score) in enumerate(results, 1):
            print(f"  结果 {i} (相似度: {score:.3f}):")
            print(f"    来源: {chunk.source}")
            print(f"    内容: {chunk.text[:200]}...")

if __name__ == "__main__":
    main()

开始处理财报数据...
从 CompanyA_2024Q1 中提取了 6 个文本块
从 CompanyB_2024Q1 中提取了 5 个文本块
批次 1 处理失败: Error code: 401 - {'error': {'message': 'Incorrect API key provided: your-ope*******-key. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}
已添加 11 个向量到数据库，总计 11 个
总共处理了 11 个文本块
数据库已保存到 financial_reports_db

测试语义搜索...

查询: 营业收入增长情况
批次 1 处理失败: Error code: 401 - {'error': {'message': 'Incorrect API key provided: your-ope*******-key. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}
  结果 1 (相似度: 0.000):
    来源: CompanyA_2024Q1_root.financial_summary.cash_flow
    内容: 经营活动现金流量净额为12亿元，现金流状况良好，为公司持续发展提供了充足的资金支持。...
  结果 2 (相似度: 0.000):
    来源: CompanyA_2024Q1_root.financial_summary.profit
    内容: 净利润为8亿元，同比增长12。盈利能力持续改善，主要得益于成本控制和运营效率提升。...
  结果 3 (相似度: 0.000):
    来源: CompanyA_2024Q1_root.financial_summary.r

In [11]:

import os
from pathlib import Path
from typing import List, Optional, Dict

from dotenv import load_dotenv, find_dotenv
from pypdf import PdfReader
import docx

# 导入 LangChain 组件
try:
    from langchain_text_splitters import RecursiveCharacterTextSplitter
except:
    from langchain.text_splitter import RecursiveCharacterTextSplitter

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

#加载 API Key
load_dotenv(find_dotenv())
if not os.getenv("OPENAI_API_KEY"):
    try:
        from google.colab import userdata
        os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
    except:
        raise RuntimeError("未检测到 OPENAI_API_KEY，请在 .env 或 Colab Secrets 中设置。")


class FinancialReportRAG:
    def __init__(
        self,
        embedding_model="text-embedding-3-small",
        chunk_size=1000,
        chunk_overlap=200,
        db_root: Path = Path("./vectorstores")
    ):
        self.embedding_model = embedding_model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.db_root = db_root

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            length_function=len
        )
        self.embeddings = OpenAIEmbeddings(model=self.embedding_model)

    # -------- 文档加载 --------
    def _read_pdf(self, file_path: Path) -> List[Dict]:
        pages_data = []
        with file_path.open("rb") as f:
            reader = PdfReader(f)
            for i, page in enumerate(reader.pages, start=1):
                page_text = page.extract_text() or ""
                pages_data.append({"page": i, "text": page_text})
        return pages_data

    def _read_docx(self, file_path: Path) -> List[Dict]:
        d = docx.Document(str(file_path))
        content = "\n".join(p.text for p in d.paragraphs if p.text)
        return [{"page": 1, "text": content}]

    def load_document(self, file_path: Path) -> List[Dict]:
        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        suffix = file_path.suffix.lower()
        if suffix == ".pdf":
            return self._read_pdf(file_path)
        elif suffix == ".docx":
            return self._read_docx(file_path)
        else:
            raise ValueError("不支持的文件类型，仅支持 PDF 或 DOCX")

    # -------- Chunk 处理 --------
    def chunk_document(self, pages_data: List[Dict]) -> List[Dict]:
        chunks_with_meta = []
        for page_data in pages_data:
            chunks = self.text_splitter.split_text(page_data["text"])
            for chunk in chunks:
                chunks_with_meta.append({
                    "content": chunk,
                    "metadata": {"page": page_data["page"]}
                })
        return chunks_with_meta

    # -------- 向量库构建 --------
    def _db_path(self, db_name: str) -> Path:
        return self.db_root / db_name

    def build_or_update_db(self, file_path: Path, db_name: Optional[str] = None):
        pages_data = self.load_document(file_path)
        chunks = self.chunk_document(pages_data)

        db_name = db_name or file_path.stem
        db_dir = self._db_path(db_name)
        self.db_root.mkdir(parents=True, exist_ok=True)

        texts = [c["content"] for c in chunks]
        metadatas = [c["metadata"] for c in chunks]

        if db_dir.exists():
            db = FAISS.load_local(
                str(db_dir),
                self.embeddings,
                allow_dangerous_deserialization=True
            )
            db.add_texts(texts, metadatas=metadatas)
        else:
            db = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)

        db.save_local(str(db_dir))
        print(f"✅ 向量库已保存到 {db_dir}")
        return db

    def load_db(self, db_name: str):
        db_dir = self._db_path(db_name)
        if not db_dir.exists():
            raise FileNotFoundError(f"向量库不存在: {db_dir}")
        return FAISS.load_local(
            str(db_dir),
            self.embeddings,
            allow_dangerous_deserialization=True
        )

    # -------- RAG 检索增强 --------
    def rag_search(self, db_name: str, question: str, keywords: Optional[List[str]] = None, k: int = 5):
        db = self.load_db(db_name)
        results = []

        if keywords:
            for kw in keywords:
                results.extend(db.similarity_search(kw, k=k))
        results.extend(db.similarity_search(question, k=k))

        # 去重，合并结果
        unique_results = []
        seen_texts = set()
        for r in results:
            if r.page_content not in seen_texts:
                seen_texts.add(r.page_content)
                unique_results.append(r)

        # 拼接上下文，附带页码
        context = ""
        for r in unique_results:
            page = r.metadata.get("page", "?")
            context += f"[Page {page}]\n{r.page_content}\n\n"
        return context.strip()


任务三

In [15]:
from pathlib import Path
from openai import OpenAI

#任务二的 RAG 引擎类已定义并命名为 FinancialReportRAG
rag = FinancialReportRAG()
client = OpenAI()

DB_NAME = "alibaba_en"
PDF_PATH = Path("/content/drive/MyDrive/alibaba_en.pdf")

# Step 1: 确保向量库存在
rag.build_or_update_db(PDF_PATH, db_name=DB_NAME)


# 通用函数：RAG + GPT 问答
# ------------------------
def rag_answer(question: str, keywords=None, k=5, db_name=DB_NAME):
    # 1) 从 RAG 检索上下文
    context = rag.rag_search(db_name=db_name, question=question, keywords=keywords, k=k)

    # 2) 构造 Prompt
    prompt = f"""
你是资深投资分析师，请基于以下财报原文回答问题，输出格式必须严格为：
结论：
理由：
原文证据：（注明来源页码，统一格式 [Page X]）

要求：
- 结论部分简明扼要，投资经理可快速理解
- 理由部分给出逻辑链条，解释为什么得出结论
- 原文证据部分引用财报的原句或数据，且标注页码
- 如果原文不足，请在证据部分明确标记为“原文未直接给出，以下为推断”

-------------------
财报内容：
{context}
-------------------
问题：
{question}
答案：
"""
    # 3) 调用 LLM
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )
    return response.choices[0].message.content


# ------------------------
# 维度一：核心驱动力与护城河
# ------------------------
question_1 = """
根据财报中各业务分部的描述和数据，分析一下“阿里电商”作为核心业务，其护城河主要体现在哪些方面？
请结合“全球拓展”、“用户基础”、“商家生态”、“平台技术”等角度，并从财报中找到原文证据。
"""
护城河_答案 = rag_answer(
    question=question_1,
    keywords=["阿里电商 护城河", "全球拓展", "用户基础", "商家生态", "平台技术"]
)
print("\n【维度一：核心驱动力与护城河】\n", 护城河_答案)


# ------------------------
# 维度二：商业模式与市场卡位
# ------------------------
question_2 = """
请用任务一提取的业务数据，计算出FY2025财年，云智能集团和阿里国际数字商业集团的收入分别占总收入的百分比。
并结合财报中的关键信息，分析哪个分部是阿里当前最重要的“第二增长曲线”？理由是什么？
"""
商业模式_答案 = rag_answer(
    question=question_2,
    keywords=["云智能集团 收入", "阿里国际数字商业集团 收入", "FY2025 总收入"]
)
print("\n【维度二：商业模式与市场卡位】\n", 商业模式_答案)


# ------------------------
# 维度三：潜在风险与红旗警报
# ------------------------
question_3 = """
请从“风险”角度，提炼出管理层认为的关于“宏观经济与市场竞争”和“数据安全与法规监管”这两大类的主要风险描述。
然后，请尝试回答：财报中的哪些财务数据（如收入增长放缓、利润率下降等）可能已经初步印证了“市场竞争”加剧的风险？
"""
风险_答案 = rag_answer(
    question=question_3,
    keywords=["宏观经济 风险", "市场竞争 风险", "数据安全 风险", "法规监管 风险"]
)
print("\n【维度三：潜在风险与红旗警报】\n", 风险_答案)


✅ 向量库已保存到 vectorstores/alibaba_en

【维度一：核心驱动力与护城河】
 结论：
阿里电商的护城河主要体现在全球拓展、用户基础、商家生态和平台技术四个方面。

理由：
1. **全球拓展**：阿里电商在国际市场上持续扩展，特别是在欧洲和海湾地区，通过多样化的产品和商业模式增强竞争优势。
2. **用户基础**：阿里电商的高价值用户群体（如88VIP会员）持续增长，增强了平台的用户粘性和消费能力。
3. **商家生态**：阿里电商致力于改善商家的运营环境，提供多种支持，促进商家的可持续发展，从而形成良好的商家生态。
4. **平台技术**：阿里云在AI和云计算领域的技术领先地位，推动了平台的创新和用户体验的提升，进一步巩固了其市场领导地位。

原文证据：
- 全球拓展：“AIDC has a diverse geographical presence, with a consistent strategic focus on key regions such as select European markets and the Gulf Region.” [Page 10]
- 用户基础：“The number of 88VIP members, our highest spending consumer group, continued to increase by double digits year-over-year, surpassing 50 million.” [Page 9]
- 商家生态：“We remained focused on improving their operating environment and ensuring their sustainable development on our platform.” [Page 9]
- 平台技术：“Alibaba Cloud was the only Chinese provider named an Emerging Leader in all four areas: Generative AI Model Providers, Generative AI Engineering, Generative AI Specialized Cloud Infr