In [28]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
import warnings
from datetime import datetime, timedelta, date

# 尝试导入 chinesecalendar
try:
    from chinese_calendar import get_holiday_detail, Holiday
except ImportError:
    pass 

# ==============================================================================
# 1. 配置层 (Configuration Layer)
# ==============================================================================
class Config:
    # --- 文件路径 ---
    INPUT_FILE = r"E:\A智网\月度电力需求预测报告\0_数据\湖北省行业日用电量_26.1.22.xlsx"
    OUTPUT_DIR = r"E:\A智网\月度电力需求预测报告\1_分析结果"
    
    # --- 时间范围配置 ---
    CURRENT_START = '2026-01-01'
    CURRENT_END = '2026-01-22'
    BASE_MONTH_STR = '2025-12' 
    
    # --- 行业定义 (序号: 备注名) ---
    INDUSTRY_DEFINITIONS = {
        1: "售电量", 2: "全行业", 3: "第一产业", 4: "第二产业", 5: "第三产业", 6: "居民生活",
        17: "工业", 
        # 高耗能
        41: "化工", 53: "建材", 58: "钢铁", 61: "有色",
        # 高技术
        46: "医药", 65: "金属制品", 67: "通用设备", 69: "专用设备", 71: "汽车", 
        73: "运输设备", 77: "电气机械", 79: "计算机通信", 81: "仪器仪表",
        # 消费品
        26: "农副食品", 27: "食品制造", 28: "酒饮料", 29: "烟草制品", 30: "纺织", 
        31: "纺织服装", 32: "皮革毛革", 33: "木材加工", 34: "家具制造", 35: "造纸", 
        36: "印刷记录", 37: "文工体娱乐",
        # 三产细分
        98: "交通运输", 110: "信息传输", 112: "互联网和相关服务", 115: "批发零售", 
        116: "充换电服务", 117: "住宿餐饮", 118: "金融业", 119: "房地产", 
        120: "租赁和商务服务", 122: "公共服务", 125: "其中：科技推广和应用服务业", 
        130: "教育、文化、体育和娱乐业", 132: "卫生和社会工作"
    }

    # --- 报表结构定义 ---
    SHEETS = [
        {
            'name': '全行业概览',
            'type': 'standard',
            'base_industry': '售电量',
            'rows': ['售电量', '全行业', '第一产业', '第二产业', '工业', '第三产业', '居民生活']
        },
        {
            'name': '高耗能行业',
            'type': 'standard',
            'base_industry': '工业',
            'rows': ['二产', '工业', '高耗能', '化工', '建材', '钢铁', '有色']
        },
        {
            'name': '高技术行业',
            'type': 'standard',
            'base_industry': '工业',
            'rows': ['二产', '工业', '高技术', '医药', '金属制品', '通用设备', '专用设备', '汽车', '运输设备', '电气机械', '计算机通信', '仪器仪表']
        },
        {
            'name': '消费品制造',
            'type': 'standard',
            'base_industry': '工业',
            'rows': ['二产', '工业', '消费品制造', '农副食品', '食品制造', '酒饮料', '烟草制品', '纺织', '纺织服装', '皮革毛革', '木材加工', '家具制造', '造纸', '印刷记录', '文工体娱乐']
        },
        {
            'name': '第三产业细分',
            'type': 'trend',
            'base_industry': '第三产业',
            'rows': ['第三产业', '交通运输', '信息传输', '互联网和相关服务', '批发零售', '充换电服务', '住宿餐饮', '金融业', '房地产', '租赁和商务服务', '公共服务', '其中：科技推广和应用服务业', '教育、文化、体育和娱乐业', '卫生和社会工作']
        }
    ]

    # --- 聚合规则 ---
    AGGREGATION_RULES = {
        '售电量': ['售电量'],
        '二产': ['第二产业'],
        '高耗能': ['化工', '建材', '钢铁', '有色'],
        '高技术': ['医药', '金属制品', '通用设备', '专用设备', '汽车', '运输设备', '电气机械', '计算机通信', '仪器仪表'],
        '消费品制造': ['农副食品', '食品制造', '酒饮料', '烟草制品', '纺织', '纺织服装', '皮革毛革', '木材加工', '家具制造', '造纸', '印刷记录', '文工体娱乐']
    }

    # --- 春节日期硬编码 (正月初一) ---
    SPRING_FESTIVAL_DATES = {
        2022: '2022-02-01',
        2023: '2023-01-22',
        2024: '2024-02-10',
        2025: '2025-01-29',
        2026: '2026-02-17'
    }

# 确保输出目录存在
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
warnings.filterwarnings('ignore')

# ==============================================================================
# 2. 核心逻辑层
# ==============================================================================

def generate_exclusion_report(df_raw):
    """
    生成剔除日期明细表 (用于核对)
    """
    print("正在生成剔除日期明细报告...")
    
    date_cols = [c for c in df_raw.columns if str(c).isdigit() and len(str(c)) == 8]
    all_dates = pd.to_datetime(date_cols, format='%Y%m%d')
    
    # 1. 准备特殊剔除集合
    pre_spring_dates = set() # 春节前
    post_spring_dates = set() # 春节后
    pre_national_dates = set() # 国庆前
    
    # 处理春节相关
    for year, date_str in Config.SPRING_FESTIVAL_DATES.items():
        spring_start = pd.Timestamp(date_str)
        # 春节前 13 天 (覆盖到前10天及可能的周末调整)
        for i in range(1, 14): 
            pre_spring_dates.add(spring_start - timedelta(days=i))
        # 春节后 15 天 (覆盖法定假期7天 + 节后8天缓冲)
        for i in range(1, 16):
            post_spring_dates.add(spring_start + timedelta(days=i))
            
    # 处理国庆相关 (每年9月28, 29, 30)
    unique_years = all_dates.year.unique()
    for year in unique_years:
        national_day = pd.Timestamp(f"{year}-10-01")
        for i in range(1, 4): # 前3天
            pre_national_dates.add(national_day - timedelta(days=i))

    exclusion_list = []
    
    def to_py_date(date_val):
        if isinstance(date_val, pd.Timestamp): return date_val.date()
        return date_val

    for dt in all_dates:
        reason = []
        py_date = dt.date()
        
        # A. 判断法定节假日
        try:
            is_h, name = get_holiday_detail(py_date)
            if name is not None:
                reason.append(f"法定节假日: {name}")
        except: pass
        
        # B. 判断元旦
        if dt.month == 1 and dt.day == 1:
            reason.append("元旦(强制)")

        # C. 判断春节前
        if dt in pre_spring_dates:
            reason.append("春节前10天+")
            
        # D. 判断春节后
        if dt in post_spring_dates:
            reason.append("春节后8天+")
            
        # E. 判断国庆前
        if dt in pre_national_dates:
            reason.append("国庆前3天")
        
        if reason:
            exclusion_list.append({
                '日期': dt.strftime('%Y-%m-%d'),
                '星期': dt.strftime('%A'),
                '剔除原因': " & ".join(reason)
            })
            
    df_exclusion = pd.DataFrame(exclusion_list)
    week_map = {'Monday': '周一', 'Tuesday': '周二', 'Wednesday': '周三', 
                'Thursday': '周四', 'Friday': '周五', 'Saturday': '周六', 'Sunday': '周日'}
    if not df_exclusion.empty:
        df_exclusion['星期'] = df_exclusion['星期'].map(week_map)
        
    return df_exclusion

def filter_statutory_holidays(df):
    """
    剔除法定节假日 + 春节前后 + 国庆前
    """
    if df.empty: return df
    
    def to_py_date(date_val):
        if isinstance(date_val, pd.Timestamp): return date_val.date()
        elif isinstance(date_val, str): return pd.to_datetime(date_val).date()
        return date_val

    # 1. 识别库中的法定节假日
    def is_lib_holiday(date_val):
        try:
            dt = to_py_date(date_val)
            is_holiday, holiday_name = get_holiday_detail(dt)
            return holiday_name is not None
        except: return False

    mask_lib = df['date'].apply(is_lib_holiday)

    # 2. 强制剔除元旦
    mask_new_year = (df['date'].dt.month == 1) & (df['date'].dt.day == 1)

    # 3. 识别特殊剔除日期
    special_dates = set()
    
    # 春节相关
    for year, date_str in Config.SPRING_FESTIVAL_DATES.items():
        spring_start = pd.Timestamp(date_str)
        # 春节前 13 天
        for i in range(1, 14): 
            special_dates.add(spring_start - timedelta(days=i))
        # 春节后 15 天 (覆盖法定假+节后8天)
        for i in range(1, 16):
            special_dates.add(spring_start + timedelta(days=i))
            
    # 国庆相关
    unique_years = df['date'].dt.year.unique()
    for year in unique_years:
        national_day = pd.Timestamp(f"{year}-10-01")
        # 国庆前 3 天
        for i in range(1, 4):
            special_dates.add(national_day - timedelta(days=i))
            
    mask_special = df['date'].isin(special_dates)
    
    # 4. 合并所有掩码
    final_mask = mask_lib | mask_new_year | mask_special
    
    # --- 调试日志 ---
    removed_count = final_mask.sum()
    start_str = df['date'].min().strftime('%Y-%m-%d')
    end_str = df['date'].max().strftime('%Y-%m-%d')
    
    if removed_count > 0:
        removed_dates = sorted(df[final_mask]['date'].unique())
        date_strs = [pd.Timestamp(d).strftime('%m-%d') for d in removed_dates]
        print(f"  -> [节假日剔除] {start_str} 至 {end_str}: 剔除 {len(date_strs)} 天。")
    else:
        print(f"  -> [节假日剔除] {start_str} 至 {end_str}: 未剔除 (0天)。")

    return df[~final_mask].copy()

def calculate_period_daily_avg(df_long, start_date, end_date):
    """计算指定时间段内的日均电量"""
    mask = (df_long['date'] >= start_date) & (df_long['date'] <= end_date)
    df_period = df_long[mask].copy()
    
    if df_period.empty: return pd.Series()
    
    df_period = filter_statutory_holidays(df_period)
    
    if df_period.empty: return pd.Series()

    days_count = df_period['date'].nunique()
    if days_count == 0: return pd.Series()
    
    return df_period.groupby('行业名称')['load'].sum() / days_count

def calculate_full_month_daily_avg(df_long, year_month_str):
    """计算指定月份的全月日均电量"""
    year, month = map(int, year_month_str.split('-'))
    start_date = pd.Timestamp(year, month, 1)
    end_date = start_date + pd.offsets.MonthEnd(0)
    return calculate_period_daily_avg(df_long, start_date, end_date)

def get_val_from_series(data_series, name, alias_to_raw):
    """获取行业值（支持聚合）"""
    if data_series is None or data_series.empty: return 0
    
    if name in Config.AGGREGATION_RULES:
        sub_names = Config.AGGREGATION_RULES[name]
        total = 0
        for sub in sub_names:
            if sub in alias_to_raw:
                raw_name = alias_to_raw[sub]
                total += data_series.get(raw_name, 0)
            elif sub in Config.AGGREGATION_RULES:
                 for sub_sub in Config.AGGREGATION_RULES[sub]:
                     if sub_sub in alias_to_raw:
                         total += data_series.get(alias_to_raw[sub_sub], 0)
        return total
    elif name in alias_to_raw:
        return data_series.get(alias_to_raw[name], 0)
    return 0

def calculate_growth_rate(current, base):
    if pd.isna(current) or pd.isna(base) or base == 0:
        return np.nan
    return (current - base) / abs(base)

def main():
    print("开始生成综合报表 (差异化表结构 + 强化剔除版)...")
    
    try:
        df_raw = pd.read_excel(Config.INPUT_FILE)
        date_cols = [c for c in df_raw.columns if str(c).isdigit() and len(str(c)) == 8]
        df_long = df_raw.melt(id_vars=['序号', '行业名称'], value_vars=date_cols, var_name='date_str', value_name='load')
        df_long['date'] = pd.to_datetime(df_long['date_str'], format='%Y%m%d')
    except Exception as e:
        print(f"读取文件失败: {e}")
        return

    # 1. 生成剔除日期明细 (用于核对)
    df_exclusion = generate_exclusion_report(df_raw)

    # 2. 建立映射
    raw_id_map = df_raw.set_index('序号')['行业名称'].to_dict()
    alias_to_raw = {}
    for uid, alias in Config.INDUSTRY_DEFINITIONS.items():
        if uid in raw_id_map:
            alias_to_raw[alias] = raw_id_map[uid]

    # 3. 定义时间点
    curr_start = pd.Timestamp(Config.CURRENT_START)
    curr_end = pd.Timestamp(Config.CURRENT_END)
    
    last_year_start = curr_start.replace(year=curr_start.year - 1)
    last_year_end = curr_end.replace(year=curr_end.year - 1)
    
    prev_year_start = curr_start.replace(year=curr_start.year - 2)
    prev_year_end = curr_end.replace(year=curr_end.year - 2)
    
    three_years_ago_start = curr_start.replace(year=curr_start.year - 3)
    three_years_ago_end = curr_end.replace(year=curr_end.year - 3)
    
    base_month_str = Config.BASE_MONTH_STR
    base_month_dt = pd.to_datetime(base_month_str)
    last_year_base_month_str = (base_month_dt.replace(year=base_month_dt.year - 1)).strftime('%Y-%m')

    print("正在计算各时间段日均电量数据池...")
    
    avg_curr = calculate_period_daily_avg(df_long, curr_start, curr_end)
    avg_last_year = calculate_period_daily_avg(df_long, last_year_start, last_year_end)
    avg_prev_year = calculate_period_daily_avg(df_long, prev_year_start, prev_year_end)
    avg_3_years_ago = calculate_period_daily_avg(df_long, three_years_ago_start, three_years_ago_end)
    
    avg_base_month = calculate_full_month_daily_avg(df_long, base_month_str)
    avg_last_year_base_month = calculate_full_month_daily_avg(df_long, last_year_base_month_str)

    # 4. 生成报表
    excel_save_path = os.path.join(Config.OUTPUT_DIR, "报告综合分析报表1.20.xlsx")
    with pd.ExcelWriter(excel_save_path) as writer:
        
        for sheet_cfg in Config.SHEETS:
            sheet_name = sheet_cfg['name']
            sheet_type = sheet_cfg.get('type', 'standard')
            base_ind = sheet_cfg['base_industry']
            rows = sheet_cfg['rows']
            
            print(f"正在生成Sheet: {sheet_name} (类型: {sheet_type}) ...")
            report_data = []
            
            base_val_curr = get_val_from_series(avg_curr, base_ind, alias_to_raw)
            base_val_last = get_val_from_series(avg_last_year, base_ind, alias_to_raw)
            base_diff = base_val_curr - base_val_last
            
            for row_name in rows:
                val_curr = get_val_from_series(avg_curr, row_name, alias_to_raw)
                val_last_year = get_val_from_series(avg_last_year, row_name, alias_to_raw)
                val_prev_year = get_val_from_series(avg_prev_year, row_name, alias_to_raw)
                val_3_years_ago = get_val_from_series(avg_3_years_ago, row_name, alias_to_raw)
                
                yoy = calculate_growth_rate(val_curr, val_last_year)
                last_yoy = calculate_growth_rate(val_last_year, val_prev_year)
                
                if row_name == base_ind or base_diff == 0:
                    contrib = np.nan
                else:
                    contrib = (val_curr - val_last_year) / base_diff
                
                row_data_dict = {
                    '行业名称': row_name,
                    f'{Config.CURRENT_START[5:]}-{Config.CURRENT_END[5:]}\n日均售电量': val_curr / 1,
                    '同比增长': yoy,
                    '上年同期同比增长': last_yoy
                }
                
                if sheet_type == 'trend':
                    prev_yoy = calculate_growth_rate(val_prev_year, val_3_years_ago)
                    row_data_dict['前年同期同比增长'] = prev_yoy
                    row_data_dict[f'对{base_ind}贡献率'] = contrib
                    
                else:
                    val_last_month = get_val_from_series(avg_base_month, row_name, alias_to_raw)
                    val_last_year_month = get_val_from_series(avg_last_year_base_month, row_name, alias_to_raw)
                    
                    mom = calculate_growth_rate(val_curr, val_last_month)
                    last_mom = calculate_growth_rate(val_last_year, val_last_year_month)
                    
                    row_data_dict[f'环比{int(base_month_str.split("-")[1])}月增长'] = mom
                    row_data_dict['上年同期环比增长'] = last_mom
                    row_data_dict[f'对{base_ind}贡献率'] = contrib
                
                report_data.append(row_data_dict)
            
            df_sheet = pd.DataFrame(report_data)
            
            pct_cols = [c for c in df_sheet.columns if '增长' in c or '贡献率' in c]
            for col in pct_cols:
                df_sheet[col] = df_sheet[col].apply(lambda x: f"{x:.2%}" if pd.notna(x) else "-")
            
            val_col = f'{Config.CURRENT_START[5:]}-{Config.CURRENT_END[5:]}\n日均售电量'
            df_sheet[val_col] = df_sheet[val_col].apply(lambda x: round(x, 2))
            
            df_sheet.to_excel(writer, sheet_name=sheet_name, index=False)
        
        # 写入剔除明细
        if not df_exclusion.empty:
            df_exclusion.to_excel(writer, sheet_name="剔除日期明细", index=False)
            print("已写入 '剔除日期明细' Sheet。")
            
    print(f"\n报表生成完毕: {excel_save_path}")

if __name__ == "__main__":
    main()

开始生成综合报表 (差异化表结构 + 强化剔除版)...
正在生成剔除日期明细报告...
正在计算各时间段日均电量数据池...
  -> [节假日剔除] 2026-01-01 至 2026-01-22: 剔除 1 天。
  -> [节假日剔除] 2025-01-01 至 2025-01-22: 剔除 8 天。
  -> [节假日剔除] 2024-01-01 至 2024-01-22: 剔除 1 天。
  -> [节假日剔除] 2023-01-01 至 2023-01-22: 剔除 16 天。
  -> [节假日剔除] 2025-12-01 至 2025-12-31: 未剔除 (0天)。
  -> [节假日剔除] 2024-12-01 至 2024-12-31: 未剔除 (0天)。
正在生成Sheet: 全行业概览 (类型: standard) ...
正在生成Sheet: 高耗能行业 (类型: standard) ...
正在生成Sheet: 高技术行业 (类型: standard) ...
正在生成Sheet: 消费品制造 (类型: standard) ...
正在生成Sheet: 第三产业细分 (类型: trend) ...
已写入 '剔除日期明细' Sheet。

报表生成完毕: E:\A智网\月度电力需求预测报告\1_分析结果\报告综合分析报表1.20.xlsx
