# 步骤 1: 准备工作与环境设置

**目标:** 导入所有需要的库，并设置好文件路径和全局变量。

In [1]:
import html
# --- 导入库与全局配置 ---
import json
import os
import re
import time

import pandas as pd
import psutil  # 用于智能检测系统资源
import spacy
import unicodedata
# --- 新增的优化库 ---
from flashtext import KeywordProcessor
from tqdm.auto import tqdm  # 使用tqdm.auto以获得更好的显示效果

# --- 全局文件与参数配置 ---
ALIYUN_OSS_PATH = ''  #ALIYUN_OSS_PATH = '/mnt/data/scripts/'
KEYWORD_JSON_PATH = os.path.join(ALIYUN_OSS_PATH, '../data_raw/china_keywords_collection.json')
SOURCE_NEWS_FILE = os.path.join(ALIYUN_OSS_PATH, '../data_raw/final_merged_all_news.csv')
CANDIDATES_FILE = os.path.join(ALIYUN_OSS_PATH, '../data_processed/china_news_candidates.csv')
FINAL_RESULT_FILE = os.path.join(ALIYUN_OSS_PATH, '../data_processed/final_china_news.csv')
REJECTED_FILE = os.path.join(ALIYUN_OSS_PATH, '../data_processed/china_news_rejected_articles.csv')

# --- 全局处理参数 ---
NEWS_COLUMN = 'CONTENT'
CHUNKSIZE = 20000  # 适当增大块大小，以减少IO次数

# --- 智能配置并行参数 ---
cpu_cores = psutil.cpu_count(logical=False) # 使用物理核心数
N_PROCESSES = min(cpu_cores - 1 if cpu_cores > 1 else 1, 8) # 使用核心数-1，但最多不超过8个
if N_PROCESSES < 1: N_PROCESSES = 1
BATCH_SIZE = 500 # spaCy并行处理时的批大小

print("✅ 块 1: 库导入和配置完成。")
print(f"   - 将使用 Flashtext 进行高效初筛。")
print(f"   - 将使用 {N_PROCESSES} 个进程进行并行精筛。")

✅ 块 1: 库导入和配置完成。
   - 将使用 Flashtext 进行高效初筛。
   - 将使用 5 个进程进行并行精筛。


# 步骤 2: 初筛准备 - 构建智能正则表达式

**目标:** 读取 中国相关关键词 JSON 文件，并执行我们讨论过的所有逻辑来构建一个强大、高效的正则表达式。

In [2]:
# --- 构建初筛用的 Flashtext 关键词处理器 ---

def build_keyword_processor(json_path):
    """
    从关键词 JSON 文件中构建一个高效的 Flashtext KeywordProcessor。
    """
    print(f"正在从 {json_path} 加载关键词...")
    try:
        with open(json_path, 'r', encoding='utf-8') as f:
            keywords_data = json.load(f)
    except FileNotFoundError:
        print(f"❌ 错误: 关键词文件未找到 {json_path}")
        return None, None

    print(f"共加载 {len(keywords_data)} 个关键词对象。")

    # 1. 提取全部关键词和别名
    all_aliases = set()
    for item in keywords_data:
        all_aliases.add(item['keyword'])
        for alias in item.get('aliases', []):
            all_aliases.add(alias)
    print(f"提取出 {len(all_aliases)} 个不重复的关键词/别名。")

    # 2. 初始化 Flashtext 处理器并添加关键词
    # case_sensitive=False 使其不区分大小写
    keyword_processor = KeywordProcessor(case_sensitive=False)
    for kw in all_aliases:
        keyword_processor.add_keyword(kw)

    print("✅ 高效关键词处理器 (Flashtext) 构建完成。")
    return keyword_processor, keywords_data


# 执行构建
keyword_processor, keywords_data = build_keyword_processor(KEYWORD_JSON_PATH)

# 将关键词处理器设为全局变量，以便后续子进程可以访问 (在某些系统上需要)
global_keyword_processor = keyword_processor

print("\n✅ 块 2: 初筛准备工作完成。")

正在从 ../data_raw/china_keywords_collection.json 加载关键词...
共加载 274 个关键词对象。
提取出 394 个不重复的关键词/别名。
✅ 高效关键词处理器 (Flashtext) 构建完成。

✅ 块 2: 初筛准备工作完成。


# 步骤 3: 执行阶段一 - 调用外部脚本进行快速初筛

**目标:** 对大文件进行分块扫描，应用正则表达式，并保存候选集。这将是整个流程中最耗时的部分。

In [3]:
# --- 单元格 3 (最终优化版) ---

# --- 定义并行处理函数 ---

def lightweight_clean(text):
    """一个非常轻量级的文本清理函数。"""
    if not isinstance(text, str):
        return ""
    text = re.sub('<[^>]*>', '', text)
    text = html.unescape(text)
    text = unicodedata.normalize('NFKC', text)
    text = re.sub(r'https?://\S+|www\.\S+', '', text)
    text = re.sub(r'\S+@\S+', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

def process_chunk(chunk_df):
    """
    处理单个数据块的核心任务：清理文本并使用Flashtext筛选。
    这个函数将在每个子进程中独立运行。
    """
    if NEWS_COLUMN not in chunk_df.columns:
        return pd.DataFrame() # 返回空DataFrame

    # 清理文本
    cleaned_series = chunk_df[NEWS_COLUMN].astype(str).apply(lightweight_clean)

    # 使用在步骤2中创建的全局 keyword_processor 进行查找
    mask = cleaned_series.apply(lambda x: len(keyword_processor.extract_keywords(x)) > 0)

    # 返回匹配到的候选行
    return chunk_df[mask]

# --- 执行阶段一 - 流式并行扫描与初筛 ---
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

print("--- 阶段一: 开始使用多进程进行高效流式初筛 ---")
start_time = time.time()

try:
    # 1. 创建CSV文件迭代器，这是流式处理的关键
    chunk_iterator = pd.read_csv(SOURCE_NEWS_FILE, chunksize=CHUNKSIZE, on_bad_lines='skip', low_memory=False)

    is_first_chunk = True
    total_candidates = 0

    # 预计算总块数以提供准确的进度条
    print("正在计算文件总块数...")
    num_lines = sum(1 for row in open(SOURCE_NEWS_FILE, 'r', encoding='utf-8', errors='ignore'))
    total_chunks = (num_lines // CHUNKSIZE) + 1
    print(f"文件约包含 {total_chunks} 个数据块。将使用 {N_PROCESSES} 个进程并行处理...")

    # 2. 使用 ProcessPoolExecutor 进行流式并行处理
    with ProcessPoolExecutor(max_workers=N_PROCESSES) as executor:

        # 3. 使用 executor.map 将迭代器中的数据块流式地分发给子进程
        # executor.map 会自动处理任务分发和结果收集，并保持顺序
        results_iterator = executor.map(process_chunk, chunk_iterator)

        # 4. 使用tqdm包装结果迭代器，以显示总体进度
        for candidates_df in tqdm(results_iterator, total=total_chunks, desc="并行初筛中"):
            if not candidates_df.empty:
                total_candidates += len(candidates_df)
                # 写入到CSV文件
                if is_first_chunk:
                    candidates_df.to_csv(CANDIDATES_FILE, index=False, mode='w', encoding='utf-8')
                    is_first_chunk = False
                else:
                    candidates_df.to_csv(CANDIDATES_FILE, index=False, mode='a', header=False, encoding='utf-8')

    end_time_stage1 = time.time()
    print("\n--- 初筛流程执行完毕 ---")
    print(f"总共找到 {total_candidates} 篇候选文章，已保存到 {CANDIDATES_FILE}")
    print(f"阶段一 (并行初筛) 耗时: {(end_time_stage1 - start_time) / 60:.2f} 分钟。")

except FileNotFoundError:
    print(f"❌ 错误: 原始新闻文件未找到 {SOURCE_NEWS_FILE}")
except Exception as e:
    print(f"❌ 处理过程中发生错误: {e}")

print("\n✅ 块 3: 初筛流程执行完毕。")

--- 阶段一: 开始使用多进程进行高效流式初筛 ---
正在计算文件总块数...
文件约包含 754 个数据块。将使用 5 个进程并行处理...
❌ 处理过程中发生错误: A child process terminated abruptly, the process pool is not usable anymore

✅ 块 3: 初筛流程执行完毕。


# 步骤 4: 精筛准备 - 加载模型与定义规则

**目标:** 负责加载 spaCy 模型和数据，并定义所有用于精筛的“否决规则”函数。

In [4]:
# --- 精筛准备 - 加载模型与定义否决规则 ---
print("--- 阶段二准备: 加载 spaCy 模型 ---")

# 禁用所有不需要的组件，只保留'parser'用于句法分析 (token.dep_, token.head, doc.sents)
# 这是对内存和速度的巨大优化
try:
    nlp = spacy.load("en_core_web_lg", disable=["ner", "lemmatizer", "tagger", "attribute_ruler"])
    print(f"✅ spaCy 模型 '{nlp.meta['name']}' 的核心组件加载成功。")
except OSError:
    print("错误: spaCy模型 'en_core_web_lg' 未安装。")
    print("请在你的终端或命令行中运行: python -m spacy download en_core_web_lg")
    nlp = None

if nlp:
    # --- 准备 PhraseMatcher ---
    print("正在准备 PhraseMatcher...")
    from spacy.matcher import PhraseMatcher
    matcher = PhraseMatcher(nlp.vocab, attr="LOWER")

    keyword_lookup = {}
    patterns = [nlp.make_doc(alias) for item in keywords_data for alias in item.get('aliases', [item['keyword']])]

    for item in keywords_data:
        for alias in item.get('aliases', [item['keyword']]):
            keyword_lookup[alias.lower()] = {
                'type': item.get('type'),
                'category': item.get('category'),
                'tier': item.get('relevance_tier')
            }

    matcher.add("ChinaKeywords", patterns)
    print(f"✅ PhraseMatcher 准备完成，已添加 {len(patterns)} 个模式。")

# --- 定义否决规则函数 (保持不变) ---
def check_negation(doc, keywords_in_doc):
    for token in doc:
        if token.dep_ == "neg" and token.head.text.lower() in keywords_in_doc:
            return True, f"否定语境: '{token.text}' 修饰了关键词 '{token.head.text}'"
    return False, ""

def check_hypothetical(doc, keywords_in_doc):
    for sent in doc.sents:
        # 改进检查逻辑，更准确
        if any(tok.lower_ in ['if', 'unless'] for tok in sent[:3]):
             if any(token.text.lower() in keywords_in_doc for token in sent):
                return True, f"假设语境: 句子包含 '{sent[:3].text.strip()}'..."
    return False, ""

def check_low_tier_only(found_keywords_info):
    if not found_keywords_info: return True, "未找到任何关键词"
    tiers = [info['tier'] for info in found_keywords_info]
    if all(tier <= 2 for tier in tiers):
        strong_categories = {"Politics", "Economics", "Geopolitics", "Technology", "Finance", "Military"}
        categories = {info['category'] for info in found_keywords_info}
        if not strong_categories.intersection(categories):
            return True, "只包含Tier 1/2的弱相关关键词 (如文化、体育)"
    return False, ""

print("✅ 块 4: 精筛规则定义和 Matcher 准备完成。")

--- 阶段二准备: 加载 spaCy 模型 ---
✅ spaCy 模型 'core_web_lg' 的核心组件加载成功。
正在准备 PhraseMatcher...
✅ PhraseMatcher 准备完成，已添加 397 个模式。
✅ 块 4: 精筛规则定义和 Matcher 准备完成。


# 步骤 5: 执行阶段二 - 精筛流程

**目标:** 加载候选集，应用所有否决规则，然后保存最终结果和被拒绝的文章。

In [None]:
# --- 执行阶段二 - 使用并行化的 spaCy 进行高效精筛 ---

print("--- 阶段二: 开始精筛候选集 ---")
start_time_s2 = time.time()

try:
    df_candidates = pd.read_csv(CANDIDATES_FILE, low_memory=False)
    print(f"✅ 成功加载 {len(df_candidates)} 篇候选文章。")
except FileNotFoundError:
    print(f"❌ 错误: 候选文件未找到 {CANDIDATES_FILE}。请先运行块 3。")
    df_candidates = pd.DataFrame()

if not df_candidates.empty and nlp:
    texts = df_candidates[NEWS_COLUMN].astype(str).tolist() # 转为list以获得最佳性能
    results = []

    print(f"开始使用 {N_PROCESSES} 个进程并行精筛...")
    # --- 核心优化：使用 n_process 并行处理 ---
    docs = nlp.pipe(texts, batch_size=BATCH_SIZE, n_process=N_PROCESSES)

    for doc in tqdm(docs, total=len(df_candidates), desc="精筛文章"):
        rejection_reason = ""
        is_rejected = False

        matches = matcher(doc)
        if not matches:
            results.append({'keep': False, 'rejection_reason': '未找到任何关键词(精筛阶段)'})
            continue

        found_keywords_text = {doc[start:end].text.lower() for match_id, start, end in matches}
        found_keywords_info = [keyword_lookup[kw] for kw in found_keywords_text if kw in keyword_lookup]

        is_rejected, rejection_reason = check_low_tier_only(found_keywords_info)
        if not is_rejected:
            is_rejected, rejection_reason = check_negation(doc, found_keywords_text)
        if not is_rejected:
            is_rejected, rejection_reason = check_hypothetical(doc, found_keywords_text)

        results.append({'keep': not is_rejected, 'rejection_reason': rejection_reason})

    # --- 合并与保存结果 ---
    print("\n正在合并精筛结果...")
    df_results = pd.DataFrame(results, index=df_candidates.index)
    df_final_with_reasons = pd.concat([df_candidates, df_results], axis=1)

    df_accepted = df_final_with_reasons[df_final_with_reasons['keep'] == True].drop(columns=['keep', 'rejection_reason'])
    df_rejected = df_final_with_reasons[df_final_with_reasons['keep'] == False].drop(columns=['keep'])

    print("\n--- 精筛完成 ---")
    df_accepted.to_csv(FINAL_RESULT_FILE, index=False, encoding='utf-8')
    print(f"✅ {len(df_accepted)} 篇最终文章已保存到: {FINAL_RESULT_FILE}")

    df_rejected.to_csv(REJECTED_FILE, index=False, encoding='utf-8')
    print(f"ℹ️ {len(df_rejected)} 篇被拒绝的文章已保存到: {REJECTED_FILE} (供分析)")

    end_time_s2 = time.time()
    print(f"阶段二 (精筛) 耗时: {(end_time_s2 - start_time_s2) / 60:.2f} 分钟。")

else:
    print("候选集为空或spaCy模型未加载，跳过精筛。")

print("\n✅ 块 5: 精筛流程执行完毕。")

--- 阶段二: 开始精筛候选集 ---
