In [1]:
import re
import time
import json
from pymongo import MongoClient
import dashscope
from http import HTTPStatus
import spacy

# ================= 配置区域 =================

# 1. 请在此处填入您的阿里千问 API Key
dashscope.api_key = 'sk-fa13f585000140deabdfa506b25a7f3d' 

# 2. 数据库配置
MONGO_URI = 'mongodb://admin:12345678@192.168.16.138:27017/?authSource=admin'
DB_NAME = "test"
COLLECTION_NAME = "overseas_website_data_all"

# ===========================================

client = MongoClient(MONGO_URI)
db = client[DB_NAME]
source_collection = db[COLLECTION_NAME]

# --- 关键词配置 (保持不变) ---
CHINA_KEYWORDS = [
    "China", "Chinese", "Beijing", "PRC", "People's Republic of China", "Mainland China", 
    "State Council", "Zhongnanhai", "CCP", "Communist Party of China", "Xi Jinping", 
    "Politburo", "Yuan", "RMB", "Renminbi", "Belt and Road", "BRI", "Made in China", 
    "Huawei", "Tencent", "Alibaba", "ByteDance", "TikTok", "PLA", "People's Liberation Army", 
    "Shanghai", "Shenzhen", "Hong Kong", "Macau", "Macao", "Taiwan", "Taipei", 
    "Xinjiang", "Tibet", "South China Sea", "Sino-"
]
CHINA_KEYWORD_REGEX = [re.compile(r'\bsino-', re.IGNORECASE)] + [
    re.compile(r'\b' + re.escape(kw.lower()) + r'\b', re.IGNORECASE) for kw in CHINA_KEYWORDS if kw != "Sino-"
]

STRATEGIC_RESOURCES_CONFIG = {
    "稀土": {
        "base_keywords": ["rare earth", "rare earth elements", "REE"], 
        "detailed_tags": {
            "轻稀土": ["Light Rare Earth Elements", "LREE", "Lanthanum", "La", "Cerium", "Ce", "Praseodymium", "Pr", "Neodymium", "Nd", "Promethium", "Pm", "Samarium", "Sm", "Europium", "Eu"],
            "重稀土": ["Heavy Rare Earth Elements", "HREE", "Gadolinium", "Gd", "Terbium", "Tb", "Dysprosium", "Dy", "Holmium", "Ho", "Erbium", "Er", "Thulium", "Tm", "Ytterbium", "Yb", "Lutetium", "Lu", "Yttrium", "Y", "Scandium", "Sc"],
            "永磁材料": ["Permanent Magnets", "NdFeB", "Neodymium Iron Boron", "SmCo", "Samarium Cobalt", "rare earth magnet"],
            "开采与分离技术": ["Mining Technology", "extraction technology", "Separation & Purification", "rare earth separation", "purification process", "solvent extraction"],
            "回收利用": ["Recycling", "rare earth recycling", "urban mining"]
        }
    },
    "矿产": {
        "base_keywords": ["mineral", "minerals", "critical minerals", "strategic minerals"],
        "detailed_tags": {
            "能源矿产": ["Energy Minerals", "Uranium", "U", "Thorium", "Th"],
            "金属矿产": [
                "Metallic Minerals", "Lithium", "Li", "Cobalt", "Co", "Nickel", "Ni", 
                "Tungsten", "W", "Tin", "Sn", "Antimony", "Sb", "Beryllium", "Be", 
                "Niobium", "Nb", "Tantalum", "Ta", "Zirconium", "Zr"
            ],
            "非金属矿产": ["Non-metallic Minerals", "Fluorite", "Graphite", "Quartz"],
            "采矿技术": ["Mining technology", "deep sea mining", "open-pit mining"]
        }
    },
    "能源": {
        "base_keywords": ["energy", "energy resources"],
        "detailed_tags": {
            "传统能源": ["Tradition Energy", "Petroleum", "Oil", "Coal", "Natural Gas", "LNG", "Fossil fuel"],
            "新能源": ["New Energy", "Renewables", "Renewable energy", "Hydrogen Energy", "Solar Power", "Photovoltaic", "Nuclear Power", "Wind Power", "Energy Storage", "Battery storage"],
            "能源运输": ["Energy Transport", "pipeline", "oil tanker", "LNG carrier", "energy grid", "transmission line"]
        }
    }
}

def extract_expanded_context(text, match_obj, max_chars=800):
    """
    【核心逻辑升级】：提取 [前一句] + [当前句] + [后一句]
    """
    keyword_start = match_obj.start()
    keyword_end = match_obj.end()
    text_len = len(text)
    
    # 定义标点符号集合 (用于判断句子边界)
    punctuations = ['.', '?', '!']
    
    # --- 1. 寻找当前句子的边界 (Current Sentence) ---
    # 向前找句首
    curr_sent_start = 0
    for i in range(keyword_start, -1, -1):
        if text[i] in punctuations:
            # 只有当标点后面有空格或换行时，才视为句子结束，避免 "U.S." 误判
            # 这里做一个简单处理：找到标点就停，位置+1为句首
            curr_sent_start = i + 1
            break
            
    # 向后找句尾
    curr_sent_end = text_len
    for i in range(keyword_end, text_len):
        if text[i] in punctuations:
            curr_sent_end = i + 1
            break
            
    # --- 2. 寻找前一句的开始 (Previous Sentence) ---
    # 从当前句首的前一个字符开始，继续向前找标点
    prev_sent_start = 0 
    if curr_sent_start > 0:
        for i in range(curr_sent_start - 2, -1, -1):
            if text[i] in punctuations:
                prev_sent_start = i + 1
                break
    
    # --- 3. 寻找后一句的结束 (Next Sentence) ---
    # 从当前句尾的后一个字符开始，继续向后找标点
    next_sent_end = text_len
    if curr_sent_end < text_len:
        for i in range(curr_sent_end + 1, text_len):
            if text[i] in punctuations:
                next_sent_end = i + 1
                break
                
    # --- 4. 截取并清理 ---
    # 截取范围：[前一句首 : 后一句尾]
    expanded_text = text[prev_sent_start : next_sent_end].strip()
    
    # 安全限制：如果因为标点缺失导致提取内容过长，强制截断
    if len(expanded_text) > max_chars:
        # 回退到简单的窗口截取
        start = max(0, keyword_start - 300)
        end = min(text_len, keyword_end + 300)
        expanded_text = "..." + text[start:end] + "..."
        
    # 清理多余换行，保持整洁
    expanded_text = re.sub(r'\s+', ' ', expanded_text)
    
    return expanded_text

def verify_with_qwen(keyword, context, tag_name):
    """
    Prompt 升级：明确告知大模型这是三句话的上下文
    """
    prompt = f"""
    你是一个专业的战略资源情报分析师。
    
    【任务】
    判断提供的文本片段中，特定的“关键词”是否在语义上属于“目标分类”。
    
    【输入信息】
    1. 目标分类: "{tag_name}"
    2. 待分析关键词: "{keyword}"
    3. 文本片段 (包含关键词的前后句上下文): 
    "{context}"

    【判断逻辑】
    请阅读整个文本片段，分析该关键词的含义。
    - 如果关键词明确指代"{tag_name}"（例如 'Li' 指代锂元素，'Co' 指代钴矿），返回 true。
    - 如果关键词是缩写、人名的一部分、或其他普通单词（例如 'Co' 是 Co-operation, 'W' 是 George W. Bush），返回 false。
    - 如果语境完全无关，返回 false。

    【输出格式】
    请仅返回标准 JSON 字符串：
    {{
        "is_match": true,
        "reason": "请用一句话中文解释，例如：根据后文提到的电池生产，这里的Li指代锂资源。"
    }}
    """

    try:
        response = dashscope.Generation.call(
            model='qwen-turbo', # 使用 turbo 即可，速度快
            prompt=prompt,
            result_format='message',
        )

        if response.status_code == HTTPStatus.OK:
            content = response.output.choices[0].message.content
            # 清理 Markdown 代码块标记
            content = content.replace('```json', '').replace('```', '').strip()
            return content
        else:
            return f'{{"is_match": false, "reason": "API Error: {response.code}"}}'
            
    except Exception as e:
        return f'{{"is_match": false, "reason": "Exception: {str(e)}"}}'

# 修改
# 加载轻量级英语模型
# 确保你已经安装并下载了模型: python -m spacy download en_core_web_sm
try:
    nlp = spacy.load("en_core_web_sm")
except OSError:
    print("正在下载 spacy 模型...")
    from spacy.cli import download
    download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")

def check_keywords_with_spacy(text, keywords):
    """
    使用 spaCy 进行 NLP 过滤的关键词匹配
    """
    found_items = []
    seen_kws = set()
    
    # 将文本转为 spaCy 文档对象
    # 增加 text[:1000000] 限制防止超大文本爆内存，视情况调整
    doc = nlp(text[:1000000]) 
    
    # 构建关键词映射
    target_kws = {kw.lower(): kw for kw in keywords}
    
    for token in doc:
        token_text_lower = token.text.lower()
        
        # 1. 基础匹配
        if token_text_lower not in target_kws:
            continue
            
        original_kw = target_kws[token_text_lower]
        
        # --- 2. NLP 智能过滤规则 ---
        
        # 规则 A: 排除人名 (PERSON)
        if token.ent_type_ == "PERSON":
            continue
            
        # 规则 B: 排除组织/国家缩写 (GPE, ORG)
        if token.ent_type_ in ["GPE", "ORG"]:
            continue
            
        # 规则 C: 排除动词和常用虚词
        if token.pos_ in ["VERB", "DET", "ADP", "PRON", "AUX"]:
            continue

        # 规则 D: 特殊符号清洗 (排除 U.S. 这种情况)
        # 检查当前 token 的下一个 token 是否是 "."
        if token.i + 1 < len(doc) and doc[token.i + 1].text == '.':
            continue
            
        # 规则 E: 大小写敏感校验 (针对短词)
        if len(original_kw) <= 2:
            if not token.text[0].isupper():
                continue
        
        # --- 3. 提取上下文 (逻辑修复) ---
        if original_kw.lower() in seen_kws:
            continue
            
        # 获取当前句子 Span
        current_sent = token.sent
        
        # 获取前一句
        # 方法：找到当前句子第一个Token的前一个Token，取其所在的句子
        prev_sent_text = ""
        if current_sent.start > 0:
            prev_token_index = current_sent.start - 1
            prev_sent_text = doc[prev_token_index].sent.text

        # 获取后一句
        # 方法：找到当前句子最后一个Token的后一个Token，取其所在的句子
        next_sent_text = ""
        if current_sent.end < len(doc):
            next_token_index = current_sent.end
            next_sent_text = doc[next_token_index].sent.text
            
        # 拼接上下文
        context = f"{prev_sent_text} {current_sent.text} {next_sent_text}".strip()
        context = re.sub(r'\s+', ' ', context)
        
        found_items.append((original_kw, context))
        seen_kws.add(original_kw.lower())
            
    return found_items

def test_strategic_resource_tagging():
    print("--- 开始测试：三级关键词匹配 + 前后句上下文 + AI 深度验证 ---\n")
    print("提示：系统将自动提取 [前一句 + 当前句 + 后一句] 发送给大模型进行分析。\n")
    
    cursor = source_collection.find({}, {"title": 1, "content": 1, "link": 1}).limit(20)
    
    total_processed = 0
    matched_docs = 0
    
    for doc in cursor:
        total_processed += 1
        content = doc.get("content", "")
        title = doc.get("title", "")
        link = doc.get("link", "")
        
        full_text = (str(title) + ". " + str(content)).strip()
        if not full_text:
            continue

        is_china_related = False
        for regex in CHINA_KEYWORD_REGEX:
            if regex.search(full_text.lower()):
                is_china_related = True
                break
        
        doc_matches = {}
        has_match_in_doc = False

        for level_2_tag, config in STRATEGIC_RESOURCES_CONFIG.items():
            # 基础匹配
            base_hits_tuples = check_keywords_with_spacy(full_text, config["base_keywords"])
            
            # 详细三级匹配
            detailed_hits_info = {}
            has_detailed = False
            
            for level_3_tag, sub_keywords in config["detailed_tags"].items():
                hits_tuples = check_keywords_with_spacy(full_text, sub_keywords)
                if hits_tuples:
                    detailed_hits_info[level_3_tag] = hits_tuples
                    has_detailed = True
            
            if base_hits_tuples or has_detailed:
                doc_matches[level_2_tag] = {
                    "base": base_hits_tuples,
                    "detailed": detailed_hits_info
                }
                has_match_in_doc = True
        
        if has_match_in_doc:
            matched_docs += 1
            print(f"[{total_processed}] 文档: {title[:60]}...")
            print(f"   Link: {link}")
            print(f"   [中国相关]: {'是' if is_china_related else '否'}")
            print(f"   ★ 一级标签: 【战略资源】") 

            for level_2, data in doc_matches.items():
                base_tuples = data['base']
                detailed_dict = data['detailed']
                
                print(f"      ├── 二级标签: 【{level_2}】")
                
                # 打印旧匹配结果 (略)
                if not detailed_dict and not base_tuples:
                    print("      │     (无匹配)")
                elif base_tuples and not detailed_dict:
                    print(f"      │     [仅原有关键词匹配]: { [x[0] for x in base_tuples] }")

                # 重点打印三级匹配及分析
                if detailed_dict:
                    print(f"      │     [新增三级标签匹配 & AI 分析]:")
                    for level_3, hits in detailed_dict.items():
                        print(f"      │       + 三级分类: <{level_3}>")
                        for kw, ctx in hits:
                            print(f"      │           --------------------------------------------------")
                            print(f"      │           关键词: [{kw}]")
                            print(f"      │           上下文: \"{ctx}\"")
                            
                            # 调用 AI
                            ai_result_json = verify_with_qwen(kw, ctx, level_3)
                            
                            try:
                                res = json.loads(ai_result_json)
                                is_match = res.get("is_match")
                                reason = res.get("reason")
                                icon = "✅" if is_match else "❌"
                                print(f"      │           {icon} AI 结论: {reason}")
                            except:
                                print(f"      │           ⚠️ AI JSON 解析失败: {ai_result_json}")
                            
                            time.sleep(0.3) 
                
                print("      │")
            print("=" * 90)

    print(f"\n测试结束。")
    print(f"共扫描文档: {total_processed}")
    print(f"命中【战略资源】相关文档: {matched_docs}")

if __name__ == "__main__":
    try:
        test_strategic_resource_tagging()
    except Exception as e:
        print(f"程序运行出错: {e}")

--- 开始测试：三级关键词匹配 + 前后句上下文 + AI 深度验证 ---

提示：系统将自动提取 [前一句 + 当前句 + 后一句] 发送给大模型进行分析。

[7] 文档: Global Climate Agreements: Successes and Failures...
   Link: https://www.cfr.org/backgrounder/paris-global-climate-change-agreements
   [中国相关]: 是
   ★ 一级标签: 【战略资源】
      ├── 二级标签: 【能源】
      │     [新增三级标签匹配 & AI 分析]:
      │       + 三级分类: <传统能源>
      │           --------------------------------------------------
      │           关键词: [Oil]
      │           上下文: "The United States and EU introduced aat COP26, which aims to slash 30 percent of methane emissions levels between 2020 and 2030. At COP28, oil companies announced they would cut their methane emissions from wells and drilling by more than 80 percent by the end of the decade. However, pledges to phase out fossil fuels were not renewed the following year at COP29.Most experts say that countries’ pledges are not ambitious enough and will not be enacted quickly enough to limit global temperature rise to 1.5°C."
      │           ✅ AI 结论: 