处理路径

In [2]:
from pathlib import Path
import os

def find_project_root(start=None, markers=("整合数据", "原始数据", "README.md")):
    """
    从 start 目录开始向上查找，直到找到包含 markers 中任一标记文件/文件夹的目录，作为项目根目录。
    """
    cur = Path(start or os.getcwd()).resolve()
    for p in [cur] + list(cur.parents):
        if any((p / m).exists() for m in markers):
            return p
    raise RuntimeError(f"Cannot find project root from {cur}. Please check markers or working dir.")

BASE_DIR = find_project_root()
print("BASE_DIR =", BASE_DIR)


BASE_DIR = C:\Users\17514\Desktop\研一\统计基础\作业\大作业


# 处理Y的

In [3]:
# pip install pandas numpy scipy
import numpy as np
import pandas as pd

# ===== 1) 读入原始数据 =====
# 把路径换成你的文件
input_path = BASE_DIR /"原始数据"/"各行业etf资金流入（被解释变量）1.xlsx"   # 示例：CSV；若是 Excel，见下方注释
df_raw = pd.read_excel(input_path, sheet_name=0)  # 如果是 xlsx


# 统一列名（去空格）
df_raw.columns = [str(c).strip() for c in df_raw.columns]

# 定位“代码”行（第二行），保存代码映射后删除该行
code_row_idx = None
for i in range(min(5, len(df_raw))):
    if str(df_raw.iloc[i,0]).strip() in ["代码", "基金代码", "代码/基金"]:
        code_row_idx = i
        break

code_map = {}
if code_row_idx is not None:
    # 第一列是“代码”字样，其余列为代码
    for col in df_raw.columns[1:]:
        code_map[col] = str(df_raw.loc[code_row_idx, col])
    # 删除代码行
    df_raw = df_raw.drop(index=code_row_idx).reset_index(drop=True)

# 把日期列转为 datetime，并按日期降序/升序均可
date_col = df_raw.columns[0]
df_raw[date_col] = pd.to_datetime(df_raw[date_col])
df_raw = df_raw.sort_values(date_col).reset_index(drop=True)

# ===== 2) 计算每只 ETF 的 signed-log 指标 =====
flow_cols = df_raw.columns[1:]  # 除日期外的所有 ETF 列

# 把数值列转 float
for c in flow_cols:
    df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce")

# 计算尺度 s_i = median(|Flow|)
scale = df_raw[flow_cols].abs().median(axis=0, skipna=True).replace(0, np.nan)
# 对极端小的尺度做兜底，避免除以 0
scale = scale.fillna(1.0)

def signed_log_transform(values, s):
    # values: pd.Series of flows for one ETF
    return np.sign(values) * np.log1p(np.abs(values) / s)

# 生成 slog 宽表
slog_wide = pd.DataFrame({date_col: df_raw[date_col]})
for c in flow_cols:
    s_i = scale[c]
    slog_wide[c] = signed_log_transform(df_raw[c], s_i)

# winsorize slog，默认 5%–95%
winsor = True
lower_q, upper_q = 0.05, 0.95
if winsor:
    for c in flow_cols:
        lo = slog_wide[c].quantile(lower_q)
        hi = slog_wide[c].quantile(upper_q)
        slog_wide[c] = slog_wide[c].clip(lower=lo, upper=hi)

# ===== 3) 生成“长表”便于后续 merge / 分组分析 =====
long = df_raw.melt(id_vars=[date_col], value_vars=flow_cols,
                   var_name="etf_name", value_name="flow_raw")
long["s_i"] = long["etf_name"].map(scale.to_dict())
long["y_slog"] = signed_log_transform(long["flow_raw"], long["s_i"])

if winsor:
    # 按每只 ETF 分别 winsorize
    def _clip_grp(g):
        lo = g["y_slog"].quantile(lower_q)
        hi = g["y_slog"].quantile(upper_q)
        g["y_slog"] = g["y_slog"].clip(lo, hi)
        return g
    long = long.groupby("etf_name", group_keys=False).apply(_clip_grp)

# 若你需要把“基金代码”也带上（来自第二行）
if code_map:
    long["etf_code"] = long["etf_name"].map(code_map)
else:
    long["etf_code"] = np.nan

# ===== 4) 导出结果 =====
# 宽表：每列一只 ETF 的 slog
from pathlib import Path

# 目标文件夹
out_dir = Path(BASE_DIR /"新数据")
out_dir.mkdir(parents=True, exist_ok=True)  # 若不存在则创建

# 文件名
output_wide = out_dir / "etf_y_slog_wide.csv"
output_long = out_dir / "etf_y_slog_long.csv"

# 导出
slog_wide.to_csv(output_wide, index=False, encoding="utf-8-sig")
long.to_csv(output_long, index=False, encoding="utf-8-sig")
print("完成：", output_wide, output_long)



完成： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\etf_y_slog_wide.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\etf_y_slog_long.csv


  long = long.groupby("etf_name", group_keys=False).apply(_clip_grp)


# D的处理


In [4]:
# pip install pandas numpy openpyxl
import os
from pathlib import Path
import numpy as np
import pandas as pd

# ================= 用户配置 =================
XLSX_PATH = BASE_DIR / "原始数据" / "政策特征分析结果.xlsx"

# 输出目录（建议统一放到一个可追踪的相对目录下）
OUT_DIR = BASE_DIR / "新数据"
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_ALL  = OUT_DIR / "policy_D_daily__ALL.csv"      # 合并后的总表
OUT_WIDE = OUT_DIR / "policy_D_daily__wide.csv"     # 宽表（D_nat, D_loc, D_total）

COL_DATE   = "公布日期"
COL_FACTS  = ["层级", "新颖度", "财政性", "约束力度", "覆盖范围"]

NORM_WINDOW_DAYS = 30
EPS_DENOM = 1

WINDOW_START = pd.Timestamp("2025-05-15")
WINDOW_END   = pd.Timestamp("2025-10-31")

MEMORY_DAYS = 14
TAIL_AT_MEMORY = 0.01

lambda_day = -np.log(TAIL_AT_MEMORY) / MEMORY_DAYS
decay = float(np.exp(-lambda_day))
print(f"[info] MEMORY_DAYS={MEMORY_DAYS}, TAIL_AT_MEMORY={TAIL_AT_MEMORY}, "
      f"lambda_day={lambda_day:.4f}, decay={decay:.4f}, decay^{MEMORY_DAYS}={decay**MEMORY_DAYS:.4f}")

SHEETS_BY_INDUSTRY = {
    "汽车": {"国家": "汽车（国家）", "地方": "汽车（地方）"},
    "家用电器": {"国家": "家用电器（国家）", "地方": "家用电器（地方）"},
    # 注意：这里你原来国家/地方写成同一个 sheet 名，建议你确认后修改
    "餐饮": {"国家": "餐饮（国家）", "地方": "餐饮（地方）"},
    "旅游": {"国家": "旅游（国家）", "地方": "旅游（地方法规）"},
    "动漫游戏": {"国家": "动漫游戏（国家）", "地方": "动漫游戏（地方）"},
    "文娱": {"国家": "文娱（国家）", "地方": "文娱（地方）"},
}

EXTRA_DAYS_AFTER_LAST = MEMORY_DAYS


# ============== 工具函数 ==============
def _read_policies_from_sheet(xlsx_path, sheet_name):
    """从单个 sheet 读取（date, weight）两列，鲁棒日期/数值解析。"""
    df = pd.read_excel(xlsx_path, sheet_name=sheet_name)
    # 清理列名里的空格
    df.columns = [str(c).strip() for c in df.columns]

    need = [COL_DATE] + COL_FACTS
    miss = [c for c in need if c not in df.columns]
    if miss:
        raise ValueError(f"Sheet[{sheet_name}] 缺少列: {miss}")

    tmp = df[need].copy()

    # ---- 日期清洗：把非数字全去掉，再按 %Y%m%d 解析；若本来就是 datetime 也能过
    if not np.issubdtype(tmp[COL_DATE].dtype, np.datetime64):
        # 统一成字符串
        tmp[COL_DATE] = tmp[COL_DATE].astype(str)
        # 去掉所有非数字字符，例如 '2024. 05. 21' -> '20240521'
        tmp[COL_DATE] = tmp[COL_DATE].str.replace(r"\D", "", regex=True)
        # 过滤长度不对的
        tmp = tmp[tmp[COL_DATE].str.len() >= 8]
        tmp[COL_DATE] = pd.to_datetime(tmp[COL_DATE].str[:8], format="%Y%m%d", errors="coerce")

    tmp = tmp.dropna(subset=[COL_DATE])

    # ---- 权重列：转成数值，无法解析的按 1.0 处理（中性权重）
    for c in COL_FACTS:
        tmp[c] = pd.to_numeric(tmp[c], errors="coerce")
        tmp[c] = tmp[c].fillna(1.0)

    tmp["w"] = tmp[COL_FACTS].prod(axis=1)

    # 打印一下读取统计，便于检查
    print(f"[read] {sheet_name}: rows={len(df)}, valid_dates={tmp.shape[0]}, "
          f"date_range=({tmp[COL_DATE].min()}, {tmp[COL_DATE].max()})")

    return tmp.rename(columns={COL_DATE: "date"})[["date", "w"]]


def _decay_daily_series(policy_df, start_date, end_date, decay, memory_days):
    """
    生成“有限记忆”的指数衰减库存序列：
      D_t = decay * D_{t-1} + arrivals_t - decay**memory_days * arrivals_{t-memory_days}

    这样 D_t 只依赖最近 memory_days 天的 arrivals。
    返回:
      D: 指数衰减库存序列
      arrivals: 每日新到达的权重和（用于标准化）
    """
    idx = pd.date_range(start_date, end_date, freq="D")
    arrivals = pd.Series(0.0, index=idx, name="arrivals")
    if not policy_df.empty:
        arrivals = policy_df.groupby("date")["w"].sum().reindex(idx, fill_value=0.0)
        arrivals.name = "arrivals"

    arrivals_vals = arrivals.values
    D_vals = np.zeros(len(idx), dtype=float)
    decay_k = decay ** memory_days  # decay 的 memory_days 次方

    for t in range(len(idx)):
        prev = D_vals[t-1] if t > 0 else 0.0
        D_t = decay * prev + arrivals_vals[t]
        # 超过 memory_days 的那一日从库存中剔除
        if t >= memory_days:
            D_t -= decay_k * arrivals_vals[t - memory_days]
        D_vals[t] = D_t

    D = pd.Series(D_vals, index=idx, name="D")
    return D, arrivals


# ============== 主流程 ==============
all_rows_long = []   # 长表：industry, tier, date, D
date_min, date_max = None, None

# 先扫描所有 sheet，拿到全局起止日期
for ind, tiers in SHEETS_BY_INDUSTRY.items():
    for tier, sh in tiers.items():
        dfj = _read_policies_from_sheet(XLSX_PATH, sh)
        if not dfj.empty:
            dmin, dmax = dfj["date"].min(), dfj["date"].max()
            date_min = dmin if date_min is None else min(date_min, dmin)
            date_max = dmax if date_max is None else max(date_max, dmax)

if date_min is None:
    raise RuntimeError("没有读到任何政策日期，请检查 SHEETS_BY_INDUSTRY 和列名。")

start_date = date_min
end_date   = date_max + pd.Timedelta(days=EXTRA_DAYS_AFTER_LAST)

# 分行业逐个生成（国家/地方/合计）
per_industry_files = []
for ind, tiers in SHEETS_BY_INDUSTRY.items():
    # 读国家/地方两张表（允许缺一张）
    nat_df = _read_policies_from_sheet(XLSX_PATH, tiers.get("国家")) if tiers.get("国家") else pd.DataFrame(columns=["date","w"])
    loc_df = _read_policies_from_sheet(XLSX_PATH, tiers.get("地方")) if tiers.get("地方") else pd.DataFrame(columns=["date","w"])

    # 生成 D 与 arrivals（有限记忆版本）
    D_nat, arr_nat = _decay_daily_series(nat_df, start_date, end_date, decay=decay, memory_days=MEMORY_DAYS)
    D_loc, arr_loc = _decay_daily_series(loc_df, start_date, end_date, decay=decay, memory_days=MEMORY_DAYS)

    D_total = D_nat.add(D_loc, fill_value=0.0)
    arr_total = arr_nat.add(arr_loc, fill_value=0.0)

    # --- 防单增标准化：用最近 W 天到达量做归一 ---
    den_nat   = arr_nat.rolling(NORM_WINDOW_DAYS, min_periods=1).sum()
    den_loc   = arr_loc.rolling(NORM_WINDOW_DAYS, min_periods=1).sum()
    den_total = arr_total.rolling(NORM_WINDOW_DAYS, min_periods=1).sum()

    D_nat_norm   = D_nat   / (den_nat   + EPS_DENOM)
    D_loc_norm   = D_loc   / (den_loc   + EPS_DENOM)
    D_total_norm = D_total / (den_total + EPS_DENOM)

    # 宽表（该行业）
    df_wide = pd.DataFrame({
        "industry": ind,
        "date": D_total.index,
        "D_nat": D_nat.values,
        "D_loc": D_loc.values,
        "D_total": D_total.values,
        "D_nat_norm": D_nat_norm.values,
        "D_loc_norm": D_loc_norm.values,
        "D_total_norm": D_total_norm.values
    })

    # 仅保留指定窗口（如 5/15~10/31）
    df_wide = df_wide[(df_wide["date"] >= WINDOW_START) & (df_wide["date"] <= WINDOW_END)].reset_index(drop=True)

    # 单行业文件
    out_ind = OUT_DIR / f"policy_D_daily__{ind}.csv"
    df_wide.to_csv(out_ind, index=False, encoding="utf-8-sig")
    per_industry_files.append(out_ind)

    # 长表（累积）——也把归一化列带上
    all_rows_long.append(
        df_wide.melt(
            id_vars=["industry", "date"],
            value_vars=["D_nat", "D_loc", "D_total", "D_nat_norm", "D_loc_norm", "D_total_norm"],
            var_name="which", value_name="D"
        )
    )

# 合并输出
D_long = pd.concat(all_rows_long, ignore_index=True)
D_long = D_long[(D_long["date"] >= WINDOW_START) & (D_long["date"] <= WINDOW_END)].reset_index(drop=True)
D_long.to_csv(OUT_ALL, index=False, encoding="utf-8-sig")

D_wide_all = D_long.pivot_table(index=["industry", "date"], columns="which", values="D", aggfunc="sum").reset_index()
# 确保三列齐全
for c in ["D_nat", "D_loc", "D_total"]:
    if c not in D_wide_all.columns:
        D_wide_all[c] = 0.0
D_wide_all = D_wide_all.sort_values(["industry", "date"])
D_wide_all.to_csv(OUT_WIDE, index=False, encoding="utf-8-sig")

print(f"[OK] 全部完成：\n- 总长表: {OUT_ALL}\n- 总宽表: {OUT_WIDE}\n- 分行业: ")
for p in per_industry_files:
    print("   ", p)


[info] MEMORY_DAYS=14, TAIL_AT_MEMORY=0.01, lambda_day=0.3289, decay=0.7197, decay^14=0.0100
[read] 汽车（国家）: rows=16, valid_dates=16, date_range=(2025-05-21 00:00:00, 2025-10-14 00:00:00)
[read] 汽车（地方）: rows=198, valid_dates=198, date_range=(2025-05-01 00:00:00, 2025-10-30 00:00:00)
[read] 家用电器（国家）: rows=22, valid_dates=22, date_range=(2025-05-06 00:00:00, 2025-11-01 00:00:00)
[read] 家用电器（地方）: rows=34, valid_dates=34, date_range=(2025-05-01 00:00:00, 2025-10-20 00:00:00)
[read] 餐饮（国家）: rows=18, valid_dates=15, date_range=(2025-05-07 00:00:00, 2025-10-13 00:00:00)
[read] 餐饮（地方）: rows=81, valid_dates=64, date_range=(2025-05-14 00:00:00, 2025-10-30 00:00:00)
[read] 旅游（国家）: rows=36, valid_dates=36, date_range=(2025-05-05 00:00:00, 2025-10-29 00:00:00)
[read] 旅游（地方法规）: rows=38, valid_dates=38, date_range=(2025-05-18 00:00:00, 2025-10-25 00:00:00)
[read] 动漫游戏（国家）: rows=9, valid_dates=9, date_range=(2025-04-30 00:00:00, 2025-10-31 00:00:00)
[read] 动漫游戏（地方）: rows=40, valid_dates=37, date_range=

# 处理OI的
因为有六个行业需要分别处理，定义处理原始数据的函数，后续直接调用

In [5]:
# pip install pandas numpy openpyxl
import re
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Sequence, Optional, Tuple, Union

# =========================
# 小工具（和你原逻辑一致）
# =========================

DATE_COL_CAND   = ["时间", "date", "日期"]
REGION_COL_CAND = ["省份/市", "地区", "region"]
KW_COL_CAND     = ["关键词", "keyword"]
TOTAL_COL_CAND  = ["搜索pc+移动", "搜索pc＋移动", "总搜索", "总量"]
PC_COL_CAND     = ["搜索pc", "pc"]
MOBILE_COL_CAND = ["搜索移动", "移动", "mobile"]

def read_any(path_stem: Path) -> Tuple[pd.DataFrame, Path]:
    """path_stem 为不带后缀的 Path，自动尝试 .xlsx/.csv"""
    for ext in [".xlsx", ".csv"]:
        path = path_stem.with_suffix(ext)
        if path.exists():
            if ext == ".xlsx":
                return pd.read_excel(path, sheet_name=0), path
            else:
                for enc in ["utf-8-sig", "utf-8", "gbk"]:
                    try:
                        df = pd.read_csv(path, sep=None, engine="python", encoding=enc)
                        return df, path
                    except Exception:
                        pass
    raise FileNotFoundError(f"找不到文件：{path_stem}(.xlsx/.csv)")

def pick_col(df: pd.DataFrame, cands: Sequence[str], default: Optional[str] = None) -> Optional[str]:
    cols = [c for c in cands if c in df.columns]
    return cols[0] if cols else default

def clean_numeric(s: pd.Series) -> pd.Series:
    return pd.to_numeric(
        s.astype(str).str.replace(",", "", regex=False)
         .str.replace(" ", "", regex=False)
         .str.replace("（", "(", regex=False).str.replace("）", ")", regex=False)
         .str.replace(r"[^\d\.\-]", "", regex=True),
        errors="coerce"
    )

def winsorize_series(s: pd.Series, lo: float = 0.05, hi: float = 0.99) -> pd.Series:
    ql, qh = s.quantile(lo), s.quantile(hi)
    return s.clip(lower=ql, upper=qh)

def _filter_national_rows(df: pd.DataFrame, region_col: Optional[str]) -> pd.DataFrame:
    """若有地区列，只保留 全国/中国/全部 或缺失。"""
    if region_col is None:
        return df
    reg = df[region_col].astype(str).str.strip()
    keep = reg.isin(["全国", "中国", "全部"]) | df[region_col].isna()
    return df.loc[keep].copy()

# =========================
# 主函数：构造并导出 OI
# =========================

def build_oi_from_baidu_files(
    p_raw: Union[str, Path],
    p_out: Union[str, Path],
    industry_tag: str,
    file_stems: Sequence[str],
    *,
    date_col_cand: Sequence[str] = DATE_COL_CAND,
    region_col_cand: Sequence[str] = REGION_COL_CAND,
    kw_col_cand: Sequence[str] = KW_COL_CAND,
    total_col_cand: Sequence[str] = TOTAL_COL_CAND,
    pc_col_cand: Sequence[str] = PC_COL_CAND,
    mobile_col_cand: Sequence[str] = MOBILE_COL_CAND,
    winsor_lo: float = 0.05,
    winsor_hi: float = 0.99,
    log1p: bool = True,
    export_wide: bool = True,
    export_simple: bool = True,
) -> Tuple[pd.DataFrame, Optional[Path], Optional[Path]]:
    """
    读取同一行业的多个关键词文件（xlsx/csv），构造日度 OI 并导出。

    参数
    - p_raw: 原始关键词文件所在文件夹（如 .../原始数据/指数/文娱传媒）
    - p_out: 输出文件夹（如 .../新数据）
    - industry_tag: 行业名，用于输出文件名
    - file_stems: 文件名列表（不带后缀），可为 6 个或任意个

    返回
    - Z: 宽表（index=date；列含各关键词 z_*, OI_mean, K_used）
    - out_wide_path / out_simple_path: 导出路径（若对应 export_* 为 False 则为 None）
    """
    p_raw = Path(p_raw)
    p_out = Path(p_out)
    p_out.mkdir(parents=True, exist_ok=True)

    dfs = []
    for stem in file_stems:
        df, real_path = read_any(p_raw / stem)
        df.columns = [str(c).strip() for c in df.columns]

        date_col   = pick_col(df, date_col_cand, df.columns[0])
        region_col = pick_col(df, region_col_cand, None)
        kw_col     = pick_col(df, kw_col_cand, None)

        total_col  = pick_col(df, total_col_cand, None)
        pc_col     = pick_col(df, pc_col_cand, None)
        mob_col    = pick_col(df, mobile_col_cand, None)

        use = df.copy()
        use = _filter_national_rows(use, region_col)

        use["date"] = pd.to_datetime(use[date_col], errors="coerce")
        use = use.dropna(subset=["date"])

        # 构造 BI
        if total_col and total_col in use.columns:
            bi = clean_numeric(use[total_col])
        else:
            if pc_col is None or mob_col is None:
                raise ValueError(f"{real_path} 缺少总量列，且 pc/移动列不全。")
            bi = clean_numeric(use[pc_col]) + clean_numeric(use[mob_col])

        # 关键词名（优先用文件内关键词列，否则用 stem）
        if kw_col and kw_col in use.columns:
            kw_name = str(use[kw_col].dropna().iloc[0]) if not use[kw_col].dropna().empty else str(stem)
        else:
            kw_name = str(stem)

        dfs.append(pd.DataFrame({"date": use["date"], "keyword": kw_name, "BI": bi}))

    raw_all = pd.concat(dfs, ignore_index=True).sort_values("date")

    # 每个关键词：winsorize + (log1p) + z-score
    zs = []
    for kw, g in raw_all.groupby("keyword"):
        s = g.set_index("date")["BI"].sort_index()
        s = winsorize_series(s, winsor_lo, winsor_hi)

        x = np.log1p(s) if log1p else s
        mu, sd = x.mean(), x.std(ddof=1)
        if not np.isfinite(sd) or sd == 0:
            sd = 1.0

        z = ((x - mu) / sd).rename(f"z_{kw}")
        zs.append(z)

    Z = pd.concat(zs, axis=1).sort_index()
    Z["OI_mean"] = Z.mean(axis=1, skipna=True)
    Z["K_used"]  = Z.notna().sum(axis=1)

    out_wide_path = None
    out_simple_path = None

    if export_wide:
        out_wide_path = p_out / f"OI_daily_{industry_tag}.csv"
        Z.reset_index().rename(columns={"index": "date"}).to_csv(out_wide_path, index=False, encoding="utf-8-sig")

    if export_simple:
        out_simple_path = p_out / f"OI_daily_{industry_tag}_simple.csv"
        Z[["OI_mean"]].reset_index().to_csv(out_simple_path, index=False, encoding="utf-8-sig")

    return Z, out_wide_path, out_simple_path

# =========================
# 便捷封装：按你的 p_base 结构调用
# =========================

def build_oi_one_industry(
    p_base: Union[str, Path],
    industry_tag: str,
    file_stems: Sequence[str],
    *,
    raw_subdir: Sequence[str] = ("原始数据", "指数"),
    out_subdir: Sequence[str] = ("新数据",),
) -> Tuple[pd.DataFrame, Optional[Path], Optional[Path]]:
    """
    假设你的目录结构：
    p_base/原始数据/指数/<industry_tag>/[1.xlsx|1.csv ...]
    p_base/新数据/...
    """
    p_base = Path(p_base)
    p_raw = p_base.joinpath(*raw_subdir, industry_tag)
    p_out = p_base.joinpath(*out_subdir)
    return build_oi_from_baidu_files(p_raw=p_raw, p_out=p_out, industry_tag=industry_tag, file_stems=file_stems)

def build_oi_batch(
    p_base: Union[str, Path],
    jobs: Sequence[Tuple[str, Sequence[str]]],
) -> List[Tuple[str, Optional[Path], Optional[Path]]]:
    """
    批量跑多个行业或多组文件名。
    jobs: [(industry_tag, [file1, file2, ...]), ...]
    返回每个行业的导出路径。
    """
    results = []
    for industry_tag, stems in jobs:
        _, out_wide, out_simple = build_oi_one_industry(p_base, industry_tag, stems)
        results.append((industry_tag, out_wide, out_simple))
    return results


调用

In [6]:


jobs = [
    ("文娱传媒", ["1","2","3"]),
    ("动漫游戏", ["1","2","3"]),
    ("家用电器",     ["1","2","3"]),
    ("旅游",     ["1","2","3"]),
    ("新能源汽车",     ["1","2","3"]),
    ("食品",     ["1","2","3"]),
]

res = build_oi_batch(BASE_DIR, jobs)
for industry, wide_path, simple_path in res:
    print(industry, wide_path, simple_path)


文娱传媒 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_文娱传媒.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_文娱传媒_simple.csv
动漫游戏 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_动漫游戏.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_动漫游戏_simple.csv
家用电器 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_家用电器.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_家用电器_simple.csv
旅游 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_旅游.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_旅游_simple.csv
新能源汽车 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_新能源汽车.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_新能源汽车_simple.csv
食品 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_食品.csv C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\OI_daily_食品_simple.csv


# 处理ET的

同样也是需要六个行业分别处理，也是先写函数再调用

In [7]:
# pip install pandas numpy chardet
import re
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Sequence, Optional, Tuple, Union, List

# =========================
# 默认候选列名（可按需覆盖）
# =========================
DATE_COL_CAND_DEFAULT  = ["update", "日期", "date", "时间"]
SCORE_COL_CAND_DEFAULT = ["sentiment_score", "sentiment", "score", "情感分"]

# =========================
# 小工具
# =========================
def read_csv_auto(path: Union[str, Path],
                  encodings: Sequence[str] = ("utf-8-sig", "utf-8", "gbk")) -> Tuple[pd.DataFrame, str]:
    """自动尝试编码 + 自动分隔符"""
    path = str(path)
    last_err = None
    for enc in encodings:
        try:
            df = pd.read_csv(path, sep=None, engine="python", encoding=enc)
            return df, enc
        except Exception as e:
            last_err = e
    raise RuntimeError(f"读入CSV失败：请确认路径/编码/分隔符。最后错误：{last_err}")

def pick_col(df: pd.DataFrame, cands: Sequence[str]) -> Optional[str]:
    for c in cands:
        if c in df.columns:
            return c
    return None

def parse_date_any(s, default_year: int = 2025) -> pd.Timestamp:
    """
    多格式日期解析：
    1) 2025-10-31 / 2025/10/31 / 2025.10.31 / 2025年10月31日
    2) 10月31日 / 10/31 / 10-31（补 default_year）
    3) pandas 兜底
    """
    x = str(s).strip()
    if not x:
        return pd.NaT

    m = re.match(r"^\s*(\d{4})[年\-/\.](\d{1,2})[月\-/\.](\d{1,2})[日]?\s*$", x)
    if m:
        y, mo, d = map(int, m.groups())
        return pd.Timestamp(year=y, month=mo, day=d)

    m = re.match(r"^\s*(\d{1,2})\s*[月/\-\.]\s*(\d{1,2})\s*[日]?\s*$", x)
    if m:
        mo, d = map(int, m.groups())
        return pd.Timestamp(year=default_year, month=mo, day=d)

    try:
        return pd.to_datetime(x, errors="raise")
    except Exception:
        return pd.NaT

# =========================
# 主函数：从“单个行业的情绪CSV”构造 ET 日度序列并导出
# =========================
def build_et_from_sentiment_csv(
    input_path: Union[str, Path],
    out_dir: Union[str, Path],
    industry_tag: str,
    *,
    date_col_cand: Sequence[str] = DATE_COL_CAND_DEFAULT,
    score_col_cand: Sequence[str] = SCORE_COL_CAND_DEFAULT,
    default_year: int = 2025,
    ema_halflife: float = 7,
    export: bool = True,
    out_filename: Optional[str] = None,
    verbose: bool = True,
) -> Tuple[pd.DataFrame, Optional[Path]]:
    """
    读取行业情绪明细CSV（含日期列 + sentiment score列），聚合为日度：
      - ET_mean: 日均值
      - ET_var : 日方差
      - ET_n   : 样本数
      - ET_z   : 对 ET_mean 做 z-score
      - ET_ema_hl{halflife}: 对 ET_z 做 EWM(halflife=7) 平滑
    并导出到 out_dir/ET_daily_{industry}.csv

    返回：
      daily_df, out_path(若 export=False 则 None)
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    df, encoding_used = read_csv_auto(input_path)
    df.columns = [str(c).strip() for c in df.columns]

    DATE_COL = pick_col(df, date_col_cand)
    SCORE_COL = pick_col(df, score_col_cand)
    if DATE_COL is None:
        raise ValueError(f"找不到日期列（候选：{list(date_col_cand)}），现有列：{df.columns.tolist()}")
    if SCORE_COL is None:
        raise ValueError(f"找不到情感分列（候选：{list(score_col_cand)}），现有列：{df.columns.tolist()}")

    if verbose:
        print(f">>> 读入成功：{input_path} (encoding={encoding_used})")
        print(">>> 列名：", df.columns.tolist())
        print(">>> 日期列/情感列：", DATE_COL, "/", SCORE_COL)
        print(">>> 日期列前10个原始值：", df[DATE_COL].astype(str).head(10).tolist())

    # 日期解析
    df["date"] = df[DATE_COL].apply(lambda x: parse_date_any(x, default_year=default_year))
    before = len(df)
    df = df.dropna(subset=["date"]).copy()
    after = len(df)
    if verbose:
        print(f">>> 日期可解析行数：{after}/{before}")

    # 情感分转数值
    df[SCORE_COL] = pd.to_numeric(df[SCORE_COL], errors="coerce")
    non_na_before = len(df)
    df = df.dropna(subset=[SCORE_COL]).copy()
    if verbose:
        print(f">>> 情感分非缺失行数：{len(df)}/{non_na_before}（丢弃 {non_na_before - len(df)} 行非数值）")

    # 日度聚合
    daily = (
        df.groupby("date")[SCORE_COL]
          .agg(ET_mean="mean", ET_var="var", ET_n="size")
          .reset_index()
          .sort_values("date")
          .reset_index(drop=True)
    )
    if len(daily) == 0:
        raise RuntimeError("聚合后为空：多半是日期或情感分列没识别到；请检查列名候选与原始数据。")

    # 标准化 + EMA(半衰期)
    mu = daily["ET_mean"].mean()
    sd = daily["ET_mean"].std(ddof=1)
    if not np.isfinite(sd) or sd == 0:
        sd = 1.0

    daily["ET_z"] = (daily["ET_mean"] - mu) / sd
    daily[f"ET_ema_hl{ema_halflife:g}"] = daily["ET_z"].ewm(halflife=ema_halflife, min_periods=1).mean()

    out_path = None
    if export:
        if out_filename is None:
            out_filename = f"ET_daily_{industry_tag}.csv"
        out_path = out_dir / out_filename
        daily.to_csv(out_path, index=False, encoding="utf-8-sig")
        if verbose:
            print(">>> 已导出：", out_path)
            print(daily.head(10))

    return daily, out_path

# =========================
# 便捷封装：按你的 p_base 结构调用（和 OI 一致）
# =========================
def build_et_one_industry(
    p_base: Union[str, Path],
    industry_tag: str,
    input_filename: str,
    *,
    raw_subdir: Sequence[str] = ("原始数据",),
    out_subdir: Sequence[str] = ("新数据",),
    **kwargs
) -> Tuple[pd.DataFrame, Optional[Path]]:
    """
    假设情绪明细在：
      p_base/原始数据/<input_filename>
    输出到：
      p_base/新数据/ET_daily_{industry}.csv
    """
    p_base = Path(p_base)
    input_path = p_base.joinpath(*raw_subdir, input_filename)
    out_dir = p_base.joinpath(*out_subdir)
    return build_et_from_sentiment_csv(
        input_path=input_path,
        out_dir=out_dir,
        industry_tag=industry_tag,
        **kwargs
    )

def build_et_batch(
    p_base: Union[str, Path],
    jobs: Sequence[Tuple[str, str]],
    *,
    raw_subdir: Sequence[str] = ("原始数据",),
    out_subdir: Sequence[str] = ("新数据",),
    **kwargs
) -> List[Tuple[str, Optional[Path]]]:
    """
    批量跑多个行业：
      jobs = [(industry_tag, input_filename), ...]
    返回每个行业导出路径
    """
    results = []
    for industry_tag, input_filename in jobs:
        _, out_path = build_et_one_industry(
            p_base=p_base,
            industry_tag=industry_tag,
            input_filename=input_filename,
            raw_subdir=raw_subdir,
            out_subdir=out_subdir,
            **kwargs
        )
        results.append((industry_tag, out_path))
    return results


调用

In [8]:


jobs = [
    ("文娱传媒", "merged_文娱传媒_top3_per_day_sentiment.csv"),
    ("动漫", "merged_动漫_top3_per_day_sentiment.csv"),
    ("家电",     "merged_家电_top3_per_day_sentiment.csv"),
    ("旅游",     "merged_旅游_top3_per_day_sentiment.csv"),
    ("汽车",     "merged_汽车_top3_per_day_sentiment.csv"),
    ("食品饮料",     "merged_食品饮料_top3_per_day_sentiment.csv"),
]

res = build_et_batch(
    p_base=BASE_DIR,
    jobs=jobs,
    default_year=2025,
    ema_halflife=7,
    verbose=False   # 批量时建议关掉打印
)

for industry, path in res:
    print(industry, path)


文娱传媒 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\ET_daily_文娱传媒.csv
动漫 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\ET_daily_动漫.csv
家电 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\ET_daily_家电.csv
旅游 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\ET_daily_旅游.csv
汽车 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\ET_daily_汽车.csv
食品饮料 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\ET_daily_食品饮料.csv


# 处理宏观控制变量的

In [9]:
# pip install pandas numpy openpyxl
import numpy as np
import pandas as pd
from pathlib import Path

# ========= 路径设置 =========
input_path = BASE_DIR / "原始数据" / "宏观控制变量.xlsx"
out_dir    = BASE_DIR / "新数据"
out_dir.mkdir(parents=True, exist_ok=True)

# ========= 读入 & 解析表头 =========
df0 = pd.read_excel(input_path, header=None)

# 第1~4行是表头信息
names_row   = df0.iloc[0].astype(str).str.strip().tolist()
freq_row    = df0.iloc[1].astype(str).str.strip().tolist()
unit_row    = df0.iloc[2].astype(str).str.strip().tolist()
# 第1列是“指标名称/频率/单位/指标ID”，后面列是各指标数据
data = df0.iloc[4:].copy()  # 从第5行开始是数据

# 设定列名：第一列叫 'date_raw'，其他列用“指标名称行”的名字
colnames = ["date_raw"] + names_row[1:]
data.columns = colnames

# ========= 日期解析（YYYYMMDD -> datetime） =========
data["date"] = pd.to_datetime(data["date_raw"].astype(str), format="%Y%m%d", errors="coerce")
data = data.dropna(subset=["date"]).sort_values("date").reset_index(drop=True)

# ========= 数值化：去掉千分位逗号/空格 =========
value_cols = colnames[1:]
for c in value_cols:
    data[c] = (
        data[c]
        .astype(str)
        .str.replace(",", "", regex=False)
        .str.replace(" ", "", regex=False)
        .replace({"": np.nan, "nan": np.nan})
    )
    data[c] = pd.to_numeric(data[c], errors="coerce")

# ========= 频率识别：来自第二行（freq_row） =========
# 第0个元素是“频率”文本，后续与各列一一对应
freq_map = dict(zip(value_cols, freq_row[1:]))

# ========= 缺失处理规则 =========
df = data.set_index("date").copy()

# 1) 月频：按“当月最后一个已知值”整月铺满；若该月全缺，用上一月值
for c in value_cols:
    if "月" in freq_map.get(c, ""):
        # 先得到“每月最后一个有效观测值”
        monthly_last = df[c].groupby([df.index.year, df.index.month]).apply(lambda s: s.ffill().iloc[-1])
        # 生成一个 {year,month} -> 值 的映射
        ym_val = {}
        for (y, m), v in monthly_last.items():
            ym_val[(y, m)] = v
        # 回填不存在的月份：用上一月值递推
        # 取数据所覆盖的所有 (y,m)
        years = sorted(df.index.year.unique())
        # 构造有序 (y,m) 序列
        ym_all = sorted({(d.year, d.month) for d in df.index})
        last_v = np.nan
        for y, m in ym_all:
            if (y, m) in ym_val and pd.notna(ym_val[(y, m)]):
                last_v = ym_val[(y, m)]
            else:
                ym_val[(y, m)] = last_v
        # 按天赋值
        df[c] = [ ym_val[(d.year, d.month)] for d in df.index ]

# 2) 日频：前向填充，再后向填充（防止开头为 NaN）
for c in value_cols:
    if "日" in freq_map.get(c, ""):
        df[c] = df[c].ffill().bfill()

df = df.reset_index()  # 还原日期列

# ========= 导出：原值填充版 =========
out_filled = out_dir / "controls_filled_daily.csv"
df.to_csv(out_filled, index=False, encoding="utf-8-sig")

# ========= 导出：标准化（z-score）版（便于进模型） =========
zdf = df.copy()
for c in value_cols:
    mu = zdf[c].mean()
    sd = zdf[c].std(ddof=1)
    if not np.isfinite(sd) or sd == 0:
        sd = 1.0
    zdf[c] = (zdf[c] - mu) / sd

out_z = out_dir / "controls_filled_daily_zscore.csv"
zdf.to_csv(out_z, index=False, encoding="utf-8-sig")

print("已导出：")
print("-", out_filled)
print("-", out_z)


已导出：
- C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\controls_filled_daily.csv
- C:\Users\17514\Desktop\研一\统计基础\作业\大作业\新数据\controls_filled_daily_zscore.csv


# 整合数据

用类的方式整合数据

In [10]:
# pip install pandas numpy openpyxl
import re
import numpy as np
import pandas as pd
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Sequence, Dict, List, Tuple, Union

# =========================
# 行业配置：把“杂乱参数”收敛成一个对象
# =========================
@dataclass
class IndustryConfig:
    # 输出标识
    industry_tag: str          # 输出文件名前缀，例如 "文娱"
    industry_name_for_D: str   # D 文件里 industry 列的取值，例如 "文娱"

    # Y 宽表里 ETF 对应列名
    etf_col_name_in_ywide: str

    # xlsx 行业控制变量列名映射
    premium_col_name_in_xlsx: str
    mom1w_col_name_in_xlsx: str

    # 每行业输入文件名（都相对 p_new）
    filename_et_daily: str
    filename_oi_daily: str
    filename_D_single: str

    # 可选：如果你未来每行业的 Y 宽表不是统一文件，也可加字段覆盖
    filename_y_wide: str = "etf_y_slog_wide.csv"


class DataPipeline:
    """
    统一管理路径、默认参数、IO、缓存，以及 OI/ET/合并面板的生产过程
    """

    def __init__(
        self,
        p_base: Union[str, Path],
        *,
        dir_raw: str = "原始数据",
        dir_new: str = "新数据",
        dir_final: str = "整合数据",
        filename_ctrl_z: str = "controls_filled_daily_zscore.csv",
        filename_industry_xlsx: str = "各行业etf控制变量.xlsx",
        sheet_premium: str = "溢价率（各行业估值水平）",
        sheet_mom1w: str = "近1周收益率动量",
        d_col: str = "D_total_norm",
        prefer_halflife: str = "7",
        verbose: bool = True,
    ):
        self.p_base = Path(p_base)
        self.p_raw = self.p_base / dir_raw
        self.p_new = self.p_base / dir_new
        self.p_final = self.p_base / dir_final

        self.p_new.mkdir(parents=True, exist_ok=True)
        self.p_final.mkdir(parents=True, exist_ok=True)

        self.filename_ctrl_z = filename_ctrl_z
        self.filename_industry_xlsx = filename_industry_xlsx
        self.sheet_premium = sheet_premium
        self.sheet_mom1w = sheet_mom1w

        self.d_col = d_col
        self.prefer_halflife = prefer_halflife
        self.verbose = verbose

        # 简单缓存，避免重复读（尤其是 controls 与 xlsx）
        self._cache: Dict[str, pd.DataFrame] = {}

    # ---------- 基础IO ----------
    @staticmethod
    def _read_csv_any(path: Union[str, Path]) -> pd.DataFrame:
        path = str(path)
        for enc in ["utf-8-sig", "utf-8", "gbk"]:
            try:
                return pd.read_csv(path, sep=None, engine="python", encoding=enc)
            except Exception:
                continue
        raise RuntimeError(f"读取CSV失败：{path}")

    @staticmethod
    def _pick_date_col(df: pd.DataFrame) -> str:
        return "date" if "date" in df.columns else df.columns[0]

    @staticmethod
    def _parse_date_series(s: pd.Series) -> pd.Series:
        return pd.to_datetime(s, errors="coerce")

    def _log(self, *args):
        if self.verbose:
            print(*args)

    # ---------- 列选择策略 ----------
    def _pick_et_col(self, df: pd.DataFrame) -> str:
        c1 = f"ET_ema_hl{self.prefer_halflife}"
        if c1 in df.columns:
            return c1
        ema_cols = sorted([c for c in df.columns if c.startswith("ET_ema_hl")])
        if ema_cols:
            return ema_cols[0]
        for c in ["ET_z", "ET_mean"]:
            if c in df.columns:
                return c
        raise ValueError(f"ET 文件中未找到可用列：{df.columns.tolist()}")

    def _pick_oi_col(self, df: pd.DataFrame) -> str:
        c1 = f"OI_ema_hl{self.prefer_halflife}"
        if c1 in df.columns:
            return c1
        if "OI_mean" in df.columns:
            return "OI_mean"
        z_cols = [c for c in df.columns if c.startswith("z_")]
        if z_cols:
            # fallback：均值
            df["_OI_fallback"] = df[z_cols].mean(axis=1, skipna=True)
            return "_OI_fallback"
        raise ValueError(f"OI 文件中未找到可用列：{df.columns.tolist()}")

    # ---------- 读取公共输入（带缓存） ----------
    def load_controls_z(self) -> pd.DataFrame:
        key = "controls_z"
        if key in self._cache:
            return self._cache[key].copy()

        path = self.p_new / self.filename_ctrl_z
        dc = self._read_csv_any(path)
        dc.columns = [str(c).strip() for c in dc.columns]
        date_col = self._pick_date_col(dc)
        dc[date_col] = self._parse_date_series(dc[date_col])
        dc = dc.dropna(subset=[date_col]).rename(columns={date_col: "date"})
        self._cache[key] = dc.copy()
        return dc

    def read_industry_col(self, sheet: str, industry_colname: str, varname: str) -> pd.DataFrame:
        # xlsx 读取也做缓存：按 sheet 缓存整张表，避免每行业重复读
        key = f"xlsx::{sheet}"
        if key not in self._cache:
            xlsx_path = self.p_raw / self.filename_industry_xlsx
            df = pd.read_excel(str(xlsx_path), sheet_name=sheet)
            df.columns = [str(c).strip() for c in df.columns]
            self._cache[key] = df.copy()

        df = self._cache[key].copy()
        date_col = "date" if "date" in df.columns else ("日期" if "日期" in df.columns else df.columns[0])

        if industry_colname not in df.columns:
            raise ValueError(f"[{sheet}] 找不到列：{industry_colname}；可选列：{df.columns.tolist()}")

        out = df[[date_col, industry_colname]].copy()
        out[date_col] = pd.to_datetime(out[date_col], errors="coerce")
        out = out.dropna(subset=[date_col])

        out[industry_colname] = (
            out[industry_colname].astype(str)
            .str.replace(",", "", regex=False)
            .str.replace(" ", "", regex=False)
            .replace({"": np.nan, "nan": np.nan})
        )
        out[industry_colname] = pd.to_numeric(out[industry_colname], errors="coerce")
        out = out.rename(columns={date_col: "date", industry_colname: varname})
        return out[["date", varname]]

    # =========================
    # 核心：合并一个行业面板
    # =========================
    def merge_panel(self, cfg: IndustryConfig, *, export_trading_only: bool = True) -> Tuple[pd.DataFrame, pd.DataFrame, Path, Optional[Path]]:
        # ---- Y ----
        path_y = self.p_new / cfg.filename_y_wide
        dy = self._read_csv_any(path_y)
        dy.columns = [str(c).strip() for c in dy.columns]
        date_col_y = dy.columns[0]
        dy[date_col_y] = self._parse_date_series(dy[date_col_y])
        dy = dy.dropna(subset=[date_col_y])

        if cfg.etf_col_name_in_ywide not in dy.columns:
            raise ValueError(f"在 {path_y} 中找不到列：{cfg.etf_col_name_in_ywide}")
        dy = dy[[date_col_y, cfg.etf_col_name_in_ywide]].rename(columns={date_col_y: "date", cfg.etf_col_name_in_ywide: "Y"})

        # ---- ET ----
        det = self._read_csv_any(self.p_new / cfg.filename_et_daily)
        det.columns = [str(c).strip() for c in det.columns]
        date_col_et = self._pick_date_col(det)
        det[date_col_et] = self._parse_date_series(det[date_col_et])
        det = det.dropna(subset=[date_col_et])
        et_col = self._pick_et_col(det)
        det = det[[date_col_et, et_col]].rename(columns={date_col_et: "date", et_col: "ET"})

        # ---- Controls ----
        dc = self.load_controls_z()
        ctrl_cols = [c for c in dc.columns if c != "date"]
        dc = dc[["date"] + ctrl_cols]

        # ---- Industry xlsx: premium & mom_1w ----
        premium_df = self.read_industry_col(self.sheet_premium, cfg.premium_col_name_in_xlsx, "premium")
        mom1w_df   = self.read_industry_col(self.sheet_mom1w,   cfg.mom1w_col_name_in_xlsx,   "mom_1w")

        # ---- Base merge ----
        df = dy.merge(det, on="date", how="outer") \
               .merge(dc,  on="date", how="outer") \
               .merge(premium_df, on="date", how="outer") \
               .merge(mom1w_df,   on="date", how="outer")

        # ---- OI ----
        doi = self._read_csv_any(self.p_new / cfg.filename_oi_daily)
        doi.columns = [str(c).strip() for c in doi.columns]
        date_col_oi = self._pick_date_col(doi)
        doi[date_col_oi] = self._parse_date_series(doi[date_col_oi])
        doi = doi.dropna(subset=[date_col_oi])

        oi_col = self._pick_oi_col(doi)
        doi = doi.groupby(date_col_oi, as_index=False)[oi_col].mean()
        doi = doi.rename(columns={date_col_oi: "date", oi_col: "OI"})
        df = df.merge(doi, on="date", how="outer")

        # ---- D ----
        dD = self._read_csv_any(self.p_new / cfg.filename_D_single)
        dD.columns = [str(c).strip() for c in dD.columns]
        date_col_D = self._pick_date_col(dD)
        dD[date_col_D] = self._parse_date_series(dD[date_col_D])
        dD = dD.dropna(subset=[date_col_D])

        if "industry" in dD.columns:
            dD["industry"] = dD["industry"].astype(str).str.strip()
            dD = dD[dD["industry"].eq(str(cfg.industry_name_for_D))]

        if self.d_col not in dD.columns:
            raise ValueError(f"{cfg.filename_D_single} 中找不到列：{self.d_col}；实际列：{dD.columns.tolist()}")

        dD = dD[[date_col_D, self.d_col]].rename(columns={date_col_D: "date", self.d_col: "D"})
        df = df.merge(dD, on="date", how="left")

        # ---- sort & missing report ----
        df = df.sort_values("date").reset_index(drop=True)
        na_report = pd.concat(
            [df.isna().sum().rename("na_count"), df.isna().mean().rename("na_ratio")],
            axis=1
        )

        # ---- export ----
        out_merged = self.p_final / f"{cfg.industry_tag}_merged_panel.csv"
        out_na     = self.p_final / f"{cfg.industry_tag}_merged_missing_report.csv"
        df.to_csv(out_merged, index=False, encoding="utf-8-sig")
        na_report.to_csv(out_na, encoding="utf-8-sig")

        out_trading = None
        if export_trading_only:
            out_trading = self.p_final / f"{cfg.industry_tag}_merged_panel_trading.csv"
            df_trading = df.dropna(subset=["Y"]).sort_values("date").reset_index(drop=True)
            df_trading.to_csv(out_trading, index=False, encoding="utf-8-sig")

        self._log("合并完成：", cfg.industry_tag, out_merged)
        if out_trading:
            self._log("交易日面板：", out_trading)

        return df, na_report, out_merged, out_trading

    # 批量
    def run_batch(self, configs: Sequence[IndustryConfig], *, export_trading_only: bool = True) -> List[Tuple[str, Path, Optional[Path]]]:
        results = []
        for cfg in configs:
            _, _, merged_path, trading_path = self.merge_panel(cfg, export_trading_only=export_trading_only)
            results.append((cfg.industry_tag, merged_path, trading_path))
        return results


输出整合好的数据

In [11]:
pipe = DataPipeline(
    p_base=BASE_DIR,
    prefer_halflife="7",      # ET/OI 优先用 *_ema_hl7
    d_col="D_total_norm",
    verbose=True
)
configs = [
    IndustryConfig(
        industry_tag="文娱",
        industry_name_for_D="文娱",
        etf_col_name_in_ywide="华夏中证文娱传媒ETF",
        premium_col_name_in_xlsx="华夏中证文娱传媒ETF",
        mom1w_col_name_in_xlsx="华夏中证文娱传媒ETF",
        filename_et_daily="ET_daily_文娱传媒.csv",
        filename_oi_daily="OI_daily_文娱传媒_simple.csv",
        filename_D_single="policy_D_daily__文娱.csv",
    ),
    # 其余行业照抄一份改字段即可
    IndustryConfig(
        industry_tag="动漫",
        industry_name_for_D="动漫",
        etf_col_name_in_ywide="华夏中证动漫游戏ETF",
        premium_col_name_in_xlsx="华夏中证动漫游戏",
        mom1w_col_name_in_xlsx="华夏中证动漫游戏ETF",
        filename_et_daily="ET_daily_动漫.csv",
        filename_oi_daily="OI_daily_动漫_simple.csv",
        filename_D_single="policy_D_daily__动漫游戏.csv",
    ),
    IndustryConfig(
        industry_tag="家电",
        industry_name_for_D="家电",
        etf_col_name_in_ywide="国泰中证全指家用电器ETF",
        premium_col_name_in_xlsx="国泰中证全指家用电器ETF",
        mom1w_col_name_in_xlsx="国泰中证全指家用电器ETF",
        filename_et_daily="ET_daily_家电.csv",
        filename_oi_daily="OI_daily_家电_simple.csv",
        filename_D_single="policy_D_daily__家用电器.csv",
    ),
    IndustryConfig(
        industry_tag="汽车",
        industry_name_for_D="文娱",
        etf_col_name_in_ywide="国泰中证800汽车与零部件ETF",
        premium_col_name_in_xlsx="国泰中证800汽车与零部件ETF",
        mom1w_col_name_in_xlsx="国泰中证800汽车与零部件ETF",
        filename_et_daily="ET_daily_文娱传媒.csv",
        filename_oi_daily="OI_daily_文娱传媒_simple.csv",
        filename_D_single="policy_D_daily__文娱.csv",
    ),
    IndustryConfig(
        industry_tag="餐饮",
        industry_name_for_D="餐饮",
        etf_col_name_in_ywide="华宝中证细分食品饮料产业主题ETF",
        premium_col_name_in_xlsx="华宝中证细分食品饮料产业主题ETF",
        mom1w_col_name_in_xlsx="华宝中证细分食品饮料产业主题ETF",
        filename_et_daily="ET_daily_食品饮料.csv",
        filename_oi_daily="OI_daily_食品_simple.csv",
        filename_D_single="policy_D_daily__餐饮.csv",
    ),
    IndustryConfig(
        industry_tag="旅游",
        industry_name_for_D="旅游",
        etf_col_name_in_ywide="富国中证旅游主题ETF",
        premium_col_name_in_xlsx="富国中证旅游主题ETF",
        mom1w_col_name_in_xlsx="富国中证旅游主题ETF",
        filename_et_daily="ET_daily_旅游.csv",
        filename_oi_daily="OI_daily_旅游_simple.csv",
        filename_D_single="policy_D_daily__旅游.csv",
    )
]

res = pipe.run_batch(configs, export_trading_only=True)
for tag, merged_path, trading_path in res:
    print(tag, merged_path, trading_path)


合并完成： 文娱 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\文娱_merged_panel.csv
交易日面板： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\文娱_merged_panel_trading.csv
合并完成： 动漫 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\动漫_merged_panel.csv
交易日面板： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\动漫_merged_panel_trading.csv
合并完成： 家电 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\家电_merged_panel.csv
交易日面板： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\家电_merged_panel_trading.csv
合并完成： 汽车 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\汽车_merged_panel.csv
交易日面板： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\汽车_merged_panel_trading.csv
合并完成： 餐饮 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\餐饮_merged_panel.csv
交易日面板： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\餐饮_merged_panel_trading.csv
合并完成： 旅游 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\旅游_merged_panel.csv
交易日面板： C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\旅游_merged_panel_trading.csv
文娱 C:\Users\17514\Desktop\研一\统计基础\作业\大作业\整合数据\文娱_merged_panel.csv C:\Users\17514\Desktop\研一\统计基础\作业\