## Task 3: Parts T16 in Registered Vehicles (Adelshofen)

In [None]:
# === Cell 1：环境 & 预览（修复版）===
import os, re, csv
from pathlib import Path
import pandas as pd
import numpy as np

# ① 路径（按你的实际情况修改这两行）
# 把这两行放进第1格（修复版）里
BASE_DIR = Path(r"D:\IDA\Case_Study_IDA_Group11")   # 项目根目录
DATA_DIR = BASE_DIR / "Data"                        # 数据目录（注意大写D）
OUT_DIR = BASE_DIR / "outputs"
OUT_DIR.mkdir(exist_ok=True, parents=True)

print(f"BASE_DIR = {BASE_DIR}")
print(f"DATA_DIR = {DATA_DIR}  | 存在? {DATA_DIR.exists()}")

def standardize_colnames(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = (df.columns
                    .str.strip()
                    .str.replace(r"\s+", "_", regex=True)
                    .str.replace(r"[^0-9a-zA-Z_]", "", regex=True)
                    .str.lower())
    return df

def sniff_delimiter(sample_text: str) -> str:
    try:
        dialect = csv.Sniffer().sniff(sample_text, delimiters=",;\t| ")
        return dialect.delimiter
    except Exception:
        if "\t" in sample_text: return "\t"
        if ";" in sample_text: return ";"
        if "|" in sample_text: return "|"
        return ","

def detect_encoding(path: Path, encs=("utf-8","utf-8-sig","gbk","latin1")) -> str:
    for enc in encs:
        try:
            with open(path, "r", encoding=enc, errors="strict") as f:
                f.read(2048)
            return enc
        except Exception:
            continue
    return "utf-8"

def peek_file(path: Path, n=5):
    info = {"file": str(path), "ext": path.suffix.lower(),
            "rows_peeked": 0, "columns": None,
            "delimiter": None, "encoding": None, "error": None}
    try:
        if path.suffix.lower() in [".csv",".txt"]:
            enc = detect_encoding(path)
            with open(path, "r", encoding=enc, errors="ignore") as f:
                head = f.read(65536)
                delim = sniff_delimiter(head)
            df = pd.read_csv(path, nrows=n, encoding=enc, sep=delim, low_memory=False)
            info.update({"rows_peeked": len(df), "columns": list(df.columns),
                         "delimiter": delim, "encoding": enc})
        elif path.suffix.lower() in [".xls",".xlsx"]:
            df = pd.read_excel(path, nrows=n, engine=None)
            info.update({"rows_peeked": len(df), "columns": list(df.columns),
                         "delimiter": "excel", "encoding": "binary"})
        else:
            info["error"] = "跳过：不支持的扩展名"
    except Exception as e:
        info["error"] = f"{type(e).__name__}: {e}"
    return info

# ② 查找文件（含子文件夹）
patterns = ("*.csv","*.txt","*.xls","*.xlsx")
all_files = []
for pat in patterns:
    all_files.extend(DATA_DIR.rglob(pat))

print(f"共找到 {len(all_files)} 个数据文件（支持后缀：csv/txt/xls/xlsx）")

if len(all_files) == 0:
    print("❌ 没找到任何数据文件。请检查：\n"
          "  1) BASE_DIR/DATA_DIR 路径是否正确；\n"
          "  2) 文件是否放在 data 文件夹或其子文件夹；\n"
          "  3) 文件后缀是否为 csv/txt/xls/xlsx。\n"
          "修正路径后重新运行本单元格即可。")
    # 创建一个空表避免后续 KeyError
    catalog_df = pd.DataFrame(columns=["file","ext","encoding","delimiter","rows_peeked","columns","error"])
else:
    catalog = [peek_file(p, n=5) for p in all_files]
    catalog_df = pd.DataFrame(catalog)
    if "file" in catalog_df.columns:
        catalog_df = catalog_df.sort_values("file")

display_cols = ["file","ext","encoding","delimiter","rows_peeked","columns","error"]
try:
    from IPython.display import display
    display(catalog_df[display_cols])
except:
    print(catalog_df[display_cols].to_string(index=False))

# ③ 导出预览（有文件时才导出）
preview_path = OUT_DIR / "Task3_catalog_preview.json"
if len(catalog_df) > 0:
    catalog_df.to_json(preview_path, force_ascii=False, orient="records", indent=2)
    print("预览已导出：", preview_path)
else:
    print("当前无可导出的预览（因为未找到数据文件）。")


BASE_DIR = D:\IDA\Case_Study_IDA_Group11
DATA_DIR = D:\IDA\Case_Study_IDA_Group11\Data  | 存在? True
共找到 89 个数据文件（支持后缀：csv/txt/xls/xlsx）


In [None]:
# === Cell 2: 配置 JOIN_PLAN ===

# 写法说明（很重要）：
# - name:     给表起个内部名字（随便取，不能重复）
# - pattern:  用通配符匹配文件名（* 支持模糊），例如 "*Einzelteil*.*"
# - usecols:  
#    · 如果写列表 ["列1","列2"] → 只读取这些列，列名保持原样
#    · 如果写字典 {"旧列名":"新列名"} → 只读这些列，并改成统一的新列名（推荐）
# - keys:     用于合并的键列（必须是两边都有的列，可以多个，例如 ["id"] 或 ["id","date"]）
# - how:      合并方式，默认 "left"（类似 SQL 里的 LEFT JOIN）

JOIN_PLAN = [
    # 示例1：主表（Einzelteil 零件表）
    {
        "name": "einzelteil",
        "pattern": "*Einzelteil_T23*.*",     # 匹配文件名里包含 "Einzelteil_T23" 的表
        "usecols": {                        # 举例：把原始列统一成小写下划线
            "ID": "teil_id",
            "Name": "teil_name",
            "Kategorie": "kategorie"
        },
        "keys": ["teil_id"],                 # 主键
        "how": "left"
    },

    # 示例2：Komponente 组件表
    {
        "name": "komponente",
        "pattern": "*Komponente*.*",
        "usecols": {
            "KomponentenID": "teil_id",      # 对应零件ID，改成和主表一致
            "Beschreibung": "beschreibung"
        },
        "keys": ["teil_id"],
        "how": "left"
    },

    # 示例3：Fahrzeug 车辆表
    {
        "name": "fahrzeug",
        "pattern": "*Fahrzeug*.*",
        "usecols": {
            "FahrzeugID": "fahrzeug_id",
            "TeilID": "teil_id"              # 注意这里对应零件
        },
        "keys": ["teil_id"],
        "how": "left"
    },

    # 你还可以继续加 Geodaten、Logistikverzug、Zulassungen 等表
]

# 是否用自动模式（JOIN_PLAN空时才会触发）
AUTO_MODE = (len(JOIN_PLAN) == 0)

# 是否抽样（先调试用，正式跑全量时改为 None）
SAMPLE_ROWS_PER_FILE = 100000
print("✅ JOIN_PLAN 配置已加载，共定义", len(JOIN_PLAN), "个表。AUTO_MODE =", AUTO_MODE)
