In [None]:
# ===================================================================
#           数据指挥中心仪表盘 (Data QA Dashboard) v3.0
# ===================================================================
#
# 目的: 自动化盘点所有已下载的数据，并提供手动深度钻取入口。
#
# -------------------------------------------------------------------

# 1. 导入必要的库
import pandas as pd
import yaml
from pathlib import Path
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from tqdm.notebook import tqdm  # 使用 notebook 版本的 tqdm
from tabulate import tabulate
from datetime import datetime

# 2. 加载配置
try:
    with open('config.yaml', 'r', encoding='utf-8') as f:
        config = yaml.safe_load(f)
    BASE_PATH = Path(config['storage']['base_path'])
    print(f"✅ 成功加载配置，数据根目录: {BASE_PATH.resolve()}")
except FileNotFoundError:
    print("❌ 未找到 config.yaml 文件。")
    BASE_PATH = Path("data")

# --- 3. 核心功能一：自动数据盘点 ---
def create_data_inventory(base_path: Path) -> pd.DataFrame:
    """
    扫描数据目录，生成所有已下载文件的元数据盘点报告。
    """
    print("\n" + "="*20, "开始自动数据盘点", "="*20)
    
    if not base_path.exists():
        print(f"  - 目录 '{base_path}' 不存在。无法进行盘点。")
        return pd.DataFrame()

    # 使用 rglob 递归查找所有 data.parquet 文件
    all_parquet_files = list(base_path.rglob("data.parquet"))
    inventory_records = []
    
    # 使用 tqdm 创建进度条
    for file_path in tqdm(all_parquet_files, desc="扫描数据文件"):
        try:
            # 从路径中解析 data_type 和 entity_id
            entity_id = file_path.parent.name.split('=')[1]
            data_type = file_path.parent.parent.name
            
            record = {
                'data_type': data_type,
                'entity_id': entity_id
            }
            
            # 读取文件以获取详细信息
            df = pd.read_parquet(file_path)
            record['row_count'] = len(df)
            record['column_count'] = len(df.columns)
            
            # 智能查找日期列
            date_col = None
            common_date_cols = ['trade_date', 'ann_date', 'cal_date', 'end_date']
            for col in common_date_cols:
                if col in df.columns:
                    date_col = col
                    break
            
            if date_col:
                # 确保日期列是字符串格式，以安全地获取min/max
                df[date_col] = df[date_col].astype(str)
                record['start_date'] = df[date_col].min()
                record['end_date'] = df[date_col].max()
            else:
                record['start_date'] = 'N/A'
                record['end_date'] = 'N/A'

            # 获取文件系统信息
            record['file_size_kb'] = round(file_path.stat().st_size / 1024, 2)
            record['last_modified'] = datetime.fromtimestamp(file_path.stat().st_mtime).strftime('%Y-%m-%d %H:%M')
            
            inventory_records.append(record)
        except Exception as e:
            print(f"\n处理文件 {file_path} 时出错: {e}")
    
    print("数据盘点完成。")
    if not inventory_records:
        return pd.DataFrame()
        
    # 转换为 DataFrame 并排序
    inventory_df = pd.DataFrame.from_records(inventory_records)
    inventory_df.sort_values(by=['data_type', 'entity_id'], inplace=True)
    return inventory_df

# --- 4. 核心功能二：手动深度钻取 ---
def deep_dive_stock(symbol: str, base_path: Path):
    """对单个股票进行深度的数据和图表分析。"""
    print("\n" + "="*20, f"对股票 {symbol} 进行深度钻取", "="*20)
    
    data_frames = {}
    # 动态查找该 symbol 存在的所有数据类型
    symbol_dirs = list(base_path.glob(f"*/entity={symbol}"))
    
    if not symbol_dirs:
        print(f"未找到股票 {symbol} 的任何数据目录。")
        return

    # 加载数据
    print("\n[数据文件加载与预览]")
    for symbol_dir in symbol_dirs:
        data_type = symbol_dir.parent.name
        file_path = symbol_dir / "data.parquet"
        print(f"\n--- 检查: {data_type} (文件: {file_path}) ---")
        if file_path.exists():
            df = pd.read_parquet(file_path)
            df['trade_date_dt'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
            data_frames[data_type] = df
            print(f"  ✅ 已加载 {len(df)} 条记录。最近5条:")
            print(tabulate(df[['trade_date', 'open', 'high', 'low', 'close', 'vol']].tail(5), headers='keys', tablefmt='psql', showindex=False))
        else:
            print(f"  ❌ 未找到数据文件。")

    if not data_frames: return

    # 绘制图表
    print("\n[图表对比]")
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.03,
                        subplot_titles=('收盘价对比 (close)', '成交量 (vol)'), row_heights=[0.8, 0.2])
    for name, df in data_frames.items():
        fig.add_trace(go.Scatter(x=df['trade_date_dt'], y=df['close'], name=f'收盘价 ({name})'), row=1, col=1)
    
    volume_source = None
    if 'daily_none' in data_frames: volume_source = 'daily_none'
    elif data_frames: volume_source, _ = next(iter(data_frames.items()))
        
    if volume_source:
        df_vol = data_frames[volume_source]
        fig.add_trace(go.Scatter(x=df_vol['trade_date_dt'], y=df_vol['vol'], name=f'成交量 ({volume_source})',
                                 fill='tozeroy', mode='lines', line=dict(width=0.5, color='rgba(44, 160, 44, 0.5)')),
                      row=2, col=1)
        if volume_source != 'daily_none':
             print(f"  - 提示: 未找到'不复权'数据，成交量图表使用'{volume_source}'数据绘制。")

    fig.update_yaxes(title_text="价格", row=1, col=1)
    fig.update_yaxes(title_text="成交量", tickformat=".2s", row=2, col=1)
    fig.update_layout(title_text=f'股票 {symbol} - 不同复权类型数据对比',
                      xaxis_rangeslider_visible=False, height=700,
                      legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1))
    fig.show()


# --- 5. 执行与展示 ---

# 执行自动盘点并展示结果
inventory_report = create_data_inventory(BASE_PATH)

if not inventory_report.empty:
    print("\n" + "="*20, "数据盘点汇总报告", "="*20)
    print("这是您本地数据仓库的完整快照。")
    # 设置Pandas显示选项，以便能看到所有行和列
    pd.set_option('display.max_rows', 200)
    display(inventory_report)

    print("\n--- 异常数据筛选：记录数少于200的实体 (可能不完整) ---")
    display(inventory_report[inventory_report['row_count'] < 200])

    print("\n--- 异常数据筛选：最新数据在30天前的实体 (可能已停止更新) ---")
    thirty_days_ago = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d')
    stale_data = inventory_report[inventory_report['end_date'] < thirty_days_ago]
    display(stale_data)
else:
    print("\n数据目录为空或未发现任何 Parquet 文件。")

# 提供手动深度钻取入口
print("\n" + "="*20, "手动深度钻取入口", "="*20)
print("下方代码将对单个股票进行详细分析。您可以修改 SYMBOL_TO_VERIFY 的值来检查不同的股票。")

# ----> 在这里输入您想验证的股票代码 <----
SYMBOL_TO_VERIFY = "600519.SH" 
deep_dive_stock(SYMBOL_TO_VERIFY, BASE_PATH)