In [17]:
# --- 核心库 ---
import logging
import json
import cdsapi
import zipfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
from typing import Tuple, Dict, List, Literal
import xarray as xr
import pandas as pd
import io
import matplotlib.pyplot as plt

# --- 项目模块 ---
from chromasky_toolkit import config
from chromasky_toolkit.map_drawer import generate_map_from_grid
from IPython.display import display, Image, clear_output

# --- 日志设置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("CamsAodPipeline")

print("✅ 环境设置与导入完成。")

✅ 环境设置与导入完成。


In [18]:
# ======================================================================
# --- 核心配置区：请在此处设置您的目标参数 ---
# ======================================================================

# 定义您想处理的未来事件意图列表。
# 可用选项: 'today_sunrise', 'today_sunset', 'tomorrow_sunrise', 'tomorrow_sunset'
TARGET_EVENT_INTENTIONS: List[Literal['today_sunrise', 'today_sunset', 'tomorrow_sunrise', 'tomorrow_sunset']] = [
    # "today_sunset",
    "tomorrow_sunrise",
    # "tomorrow_sunset"
]

# (可选) 定义要可视化的 AOD 变量
AOD_VAR_TO_VISUALIZE = 'aod550'

In [19]:
def _find_latest_available_cams_run() -> Tuple[datetime.date, str] | None:
    """
    智能判断当前可用的最新 CAMS 运行周期 (00z 或 12z)。
    CAMS 数据通常有较长的延迟，我们设置一个安全边际。
    """
    logger.info("--- [CAMS] 正在寻找最新的可用运行周期... ---")
    now_utc = datetime.now(timezone.utc)
    safe_margin = timedelta(hours=9)
    
    potential_runs = [
        (now_utc.date(), "12:00"),
        (now_utc.date(), "00:00"),
        (now_utc.date() - timedelta(days=1), "12:00"),
        (now_utc.date() - timedelta(days=1), "00:00"),
    ]

    for run_date, run_hour_str in potential_runs:
        run_time_utc = datetime.strptime(f"{run_date.strftime('%Y-%m-%d')} {run_hour_str}", "%Y-%m-%d %H:%M").replace(tzinfo=timezone.utc)
        if (now_utc - run_time_utc) >= safe_margin:
            logger.info(f"✅ 找到最新的可用运行周期: {run_date.strftime('%Y-%m-%d')} {run_hour_str} UTC")
            return run_date, run_hour_str
            
    logger.error("❌ 在过去48小时内未能找到任何满足安全边际的 CAMS 运行周期。")
    return None

def expand_target_events_for_cams(simple_events: List[str]) -> Dict[str, List[str]]:
    """将简单的事件意图展开为按天分组的具体事件列表。"""
    expanded_by_day = {'today': [], 'tomorrow': []}
    if 'today_sunrise' in simple_events:
        expanded_by_day['today'].extend(config.SUNRISE_EVENT_TIMES)
    if 'today_sunset' in simple_events:
        expanded_by_day['today'].extend(config.SUNSET_EVENT_TIMES)
    if 'tomorrow_sunrise' in simple_events:
        expanded_by_day['tomorrow'].extend(config.SUNRISE_EVENT_TIMES)
    if 'tomorrow_sunset' in simple_events:
        expanded_by_day['tomorrow'].extend(config.SUNSET_EVENT_TIMES)
    
    expanded_by_day['today'] = sorted(list(set(expanded_by_day['today'])))
    expanded_by_day['tomorrow'] = sorted(list(set(expanded_by_day['tomorrow'])))
    return expanded_by_day

def download_cams_aod_raw_data(run_date_obj: datetime.date, run_hour_str: str, events_to_process: List[str]) -> Path | None:
    """
    为指定的 CAMS 运行周期下载包含所有预报时效的原始 AOD 数据。
    如果数据已存在，则直接返回路径。
    该版本会为所有指定意图的事件（无论是否已过去）计算预报时效。
    """
    run_date_str = run_date_obj.strftime('%Y-%m-%d')
    output_dir_name = f"{run_date_obj.strftime('%Y%m%d')}_t{run_hour_str[:2]}z"
    output_dir = config.CAMS_AOD_DATA_DIR / output_dir_name
    output_dir.mkdir(parents=True, exist_ok=True)
    manifest_path = output_dir / "manifest_aod.json"
    final_output_file = output_dir / "aod_forecast.nc"
    temp_download_path = output_dir / "temp_download"

    if manifest_path.exists() and final_output_file.exists():
        logger.info(f"✅ 原始 CAMS AOD 数据和清单文件已在 '{output_dir}' 存在，跳过下载。")
        return final_output_file
        
    logger.info(f"--- [CAMS] 开始为运行周期 {run_date_str} {run_hour_str} UTC 下载新数据 ---")
    
    # --- 动态计算需要的预报时效 ---
    base_run_time = datetime.strptime(f"{run_date_str} {run_hour_str}", "%Y-%m-%d %H:%M").replace(tzinfo=timezone.utc)
    local_tz = ZoneInfo(config.LOCAL_TZ)
    today = datetime.now(local_tz).date()
    tomorrow = today + timedelta(days=1)
    
    expanded_events_by_day = expand_target_events_for_cams(events_to_process)
    
    target_utc_times = []
    # 移除 dt_local > now_local 的检查，确保所有意图都被处理
    if expanded_events_by_day['today']:
        for t_str in expanded_events_by_day['today']:
            dt_local = datetime.combine(today, datetime.strptime(t_str, '%H:%M').time(), tzinfo=local_tz)
            target_utc_times.append(dt_local.astimezone(timezone.utc))
    if expanded_events_by_day['tomorrow']:
        for t_str in expanded_events_by_day['tomorrow']:
            dt_local = datetime.combine(tomorrow, datetime.strptime(t_str, '%H:%M').time(), tzinfo=local_tz)
            target_utc_times.append(dt_local.astimezone(timezone.utc))

    if not target_utc_times:
        logger.warning("[CAMS] 根据您的配置，没有找到任何需要处理的事件。")
        return None

    # 计算预报时效，并过滤掉负值
    leadtime_hours_set = {round((t - base_run_time).total_seconds() / 3600) for t in target_utc_times}
    leadtime_hours_set = {h for h in leadtime_hours_set if h >= 0}

    if not leadtime_hours_set:
        logger.warning("[CAMS] 计算出的所有目标事件都早于最新的预报运行周期，无法下载。")
        return None
        
    leadtime_hours_list = sorted([str(h) for h in leadtime_hours_set])
    
    logger.info(f"将为 CAMS 运行周期下载 {len(leadtime_hours_list)} 个特定预报时效的数据: {leadtime_hours_list}")

    try:
        c = cdsapi.Client(timeout=600, quiet=False, url="https://ads.atmosphere.copernicus.eu/api", key=config.CDS_API_KEY)
        area_bounds = [config.AREA_EXTRACTION[k] for k in ["north", "west", "south", "east"]]
        
        request_params = {
            'date': run_date_str,
            'time': run_hour_str,
            'format': 'netcdf',
            'variable': config.CAMS_AOD_VARIABLES,
            'leadtime_hour': leadtime_hours_list,
            'type': 'forecast',
            'area': area_bounds
        }

        logger.info("正在向 Copernicus ADS 发送请求...")
        c.retrieve(config.CAMS_DATASET_NAME, request_params, str(temp_download_path))
        logger.info(f"✅ 临时文件已成功下载到: {temp_download_path}")

        if zipfile.is_zipfile(temp_download_path):
            logger.info("检测到下载文件为 ZIP 压缩包，开始解压...")
            with zipfile.ZipFile(temp_download_path, 'r') as zip_ref:
                nc_files_in_zip = [f for f in zip_ref.namelist() if f.endswith('.nc')]
                if not nc_files_in_zip:
                    raise FileNotFoundError("ZIP 包中未找到任何 .nc 文件！")
                source_nc_path_str = zip_ref.extract(nc_files_in_zip[0], path=output_dir)
                Path(source_nc_path_str).rename(final_output_file)
                logger.info(f"已将文件重命名为: {final_output_file}")
        else:
            logger.info("检测到下载文件为 NetCDF，直接重命名。")
            temp_download_path.rename(final_output_file)
        
        # 创建清单文件
        forecast_details = []
        for hour in sorted([int(h) for h in leadtime_hours_list]):
            forecast_time_utc = base_run_time + timedelta(hours=hour)
            forecast_time_local = forecast_time_utc.astimezone(local_tz)
            forecast_details.append({
                "leadtime_hour": hour,
                "forecast_time_utc": forecast_time_utc.isoformat(),
                "forecast_time_local": forecast_time_local.isoformat(),
            })

        manifest_content = {
            "base_run_time_utc": base_run_time.isoformat(),
            "data_file_path": str(final_output_file.relative_to(config.PROJECT_ROOT)),
            "forecasts": forecast_details
        }
        
        with open(manifest_path, 'w') as f:
            json.dump(manifest_content, f, indent=4)
        logger.info(f"✅ AOD 数据清单已成功写入: {manifest_path}")
        return final_output_file

    except Exception as e:
        logger.error(f"❌ 下载或处理 CAMS AOD 数据时发生严重错误: {e}", exc_info=True)
        return None
    finally:
        if temp_download_path.exists():
            temp_download_path.unlink()
            logger.info(f"已清理临时文件: {temp_download_path}")

def download_and_process_cams_aod(run_date_obj: datetime.date, run_hour_str: str, events_to_process: List[str]) -> Dict[str, List[Path]]:
    """为指定的 CAMS 运行周期，下载原始 AOD 数据，并将其处理分解。"""
    # 1. 下载原始 NetCDF 文件
    raw_nc_path = download_cams_aod_raw_data(run_date_obj, run_hour_str, events_to_process)
    
    processed_files_by_event: Dict[str, List[Path]] = {}
    
    if not (raw_nc_path and raw_nc_path.exists()):
        logger.error("❌ 原始 CAMS AOD 数据下载失败或未找到，无法进行处理。")
        return processed_files_by_event

    # --- 2. 处理原始 NetCDF 文件 ---
    logger.info("\n" + "-"*80)
    logger.info(f"--- 开始处理原始文件: {raw_nc_path.name} ---")

    try:
        with xr.open_dataset(raw_nc_path, engine="netcdf4") as ds:
            manifest_path = raw_nc_path.with_name("manifest_aod.json")
            if not manifest_path.exists():
                logger.error(f"❌ 清单文件 {manifest_path} 未找到，无法确定要处理的时间点。")
                return processed_files_by_event

            with open(manifest_path, 'r') as f:
                manifest = json.load(f)
            
            for forecast_info in manifest.get('forecasts', []):
                hour = forecast_info['leadtime_hour']
                local_dt = datetime.fromisoformat(forecast_info['forecast_time_local'])
                local_date_str = local_dt.strftime('%Y-%m-%d')
                local_time_str_path = local_dt.strftime('%H%M')
                
                event_key = f"{local_date_str}_{local_time_str_path}"
                processed_files_by_event[event_key] = []

                target_forecast_period = timedelta(hours=hour)
                time_slice = ds.sel(forecast_period=target_forecast_period, method='nearest').squeeze()

                for var_name in config.CAMS_AOD_VARIABLES:
                    if var_name in time_slice:
                        data_slice = time_slice[var_name]
                        
                        output_dir = config.PROCESSED_DATA_DIR / "future" / local_date_str
                        output_dir.mkdir(parents=True, exist_ok=True)
                        output_path = output_dir / f"{var_name}_{local_time_str_path}.nc"

                        data_slice.attrs['original_utc_time'] = forecast_info['forecast_time_utc']
                        data_slice.to_netcdf(output_path)
                        
                        logger.info(f"  ✅ 已将 '{var_name}' @ {local_time_str_path} 保存到: {output_path.relative_to(config.PROJECT_ROOT)}")
                        processed_files_by_event[event_key].append(output_path)

    except Exception as e:
        logger.error(f"❌ 处理原始 CAMS AOD 文件时发生严重错误: {e}", exc_info=True)

    return processed_files_by_event

In [21]:
# --- 1. 寻找最新的 CAMS 运行周期 ---
cams_run_info = _find_latest_available_cams_run()

if cams_run_info:
    run_date, run_hour = cams_run_info
    
    # --- 2. 步骤一: 下载原始数据 ---
    logger.info("\n" + "="*80)
    logger.info(f"===== 开始为 {run_date.strftime('%Y-%m-%d')} {run_hour}z 运行周期的 CAMS 预报执行数据流水线 =====")
    
    raw_nc_path = download_cams_aod_raw_data(run_date, run_hour, TARGET_EVENT_INTENTIONS)

    # --- 3. 步骤二: 处理原始数据并保存为独立文件 ---
    if not (raw_nc_path and raw_nc_path.exists()):
        logger.error("❌ 原始 CAMS AOD 数据下载失败或未找到，无法继续。")
    else:
        logger.info("\n" + "-"*80)
        logger.info(f"--- 步骤二: 开始处理原始文件: {raw_nc_path.name} ---")
        
        processed_files_by_event: Dict[str, List[Path]] = {}
        try:
            with xr.open_dataset(raw_nc_path, engine="netcdf4") as ds:
                manifest_path = raw_nc_path.with_name("manifest_aod.json")
                if not manifest_path.exists():
                    logger.error(f"❌ 清单文件 {manifest_path} 未找到，无法进行处理。")
                else:
                    # ==========================================================
                    # --- 核心调试：打印清单内容 ---
                    # ==========================================================
                    with open(manifest_path, 'r') as f:
                        manifest_content_str = f.read()
                        print("\n【诊断】读取到的 manifest_aod.json 文件内容:")
                        print(manifest_content_str)
                        # 重新解析
                        manifest = json.loads(manifest_content_str)
                    # ==========================================================

                    forecast_list = manifest.get('forecasts', [])
                    print(f"【诊断】从清单中提取到 {len(forecast_list)} 个预报事件。")
                    
                    for forecast_info in forecast_list:
                        hour = forecast_info['leadtime_hour']
                        local_dt = datetime.fromisoformat(forecast_info['forecast_time_local'])
                        local_date_str = local_dt.strftime('%Y-%m-%d')
                        local_time_str_path = local_dt.strftime('%H%M')
                        event_key = f"{local_date_str}_{local_time_str_path}"
                        
                        target_forecast_period = timedelta(hours=hour)
                        time_slice = ds.sel(forecast_period=target_forecast_period, method='nearest').squeeze()

                        for var_name in config.CAMS_AOD_VARIABLES:
                            if var_name in time_slice:
                                data_slice = time_slice[var_name]
                                output_dir = config.PROCESSED_DATA_DIR / "future" / local_date_str
                                output_dir.mkdir(parents=True, exist_ok=True)
                                output_path = output_dir / f"{var_name}_{local_time_str_path}.nc"
                                data_slice.attrs['original_utc_time'] = forecast_info['forecast_time_utc']
                                data_slice.to_netcdf(output_path)
                                
                                processed_files_by_event.setdefault(event_key, []).append(output_path)
                                logger.info(f"  ✅ 已保存: {output_path.relative_to(config.PROJECT_ROOT)}")
        except Exception as e:
            logger.error(f"❌ 处理原始 CAMS AOD 文件时发生严重错误: {e}", exc_info=True)

        # --- 4. 步骤三: 可视化 ---

        logger.info("✅ 所有数据处理完成，开始可视化...")
        
        if not processed_files_by_event:
            logger.warning("未能处理或生成任何文件，无法进行可视化。")
        else:
            for event_key, nc_files in sorted(processed_files_by_event.items()):
                if not nc_files: continue

                local_date, local_time = event_key.split('_')
                print("\n" + "="*80)
                print(f"--- 可视化事件 @ 本地时间: {local_date} {local_time[:2]}:{local_time[2:]} ---")
                
                # 我们只可视化用户指定的那个变量
                target_file = next((p for p in nc_files if p.name.startswith(AOD_VAR_TO_VISUALIZE)), None)

                if not target_file:
                    print(f"  - 未能找到变量 '{AOD_VAR_TO_VISUALIZE}' 的已处理文件。")
                    continue

                data_slice = xr.open_dataarray(target_file)
                utc_time_obj = datetime.fromisoformat(data_slice.attrs['original_utc_time'])
                
                map_title = (
                    f"{data_slice.attrs.get('long_name', data_slice.name)}\n"
                    f"Forecast: {local_date} {local_time[:2]}:{local_time[2:]} (Local)"
                )
                
                image_bytes = generate_map_from_grid(data_slice, map_title)
                if image_bytes:
                    display(Image(data=image_bytes, width=700))

else:
    logger.error("❌ 未能找到可用的 CAMS 运行周期，任务无法执行。")

2025-08-10 22:43:34,946 - CamsAodPipeline - INFO - --- [CAMS] 正在寻找最新的可用运行周期... ---
2025-08-10 22:43:34,947 - CamsAodPipeline - INFO - ✅ 找到最新的可用运行周期: 2025-08-10 00:00 UTC
2025-08-10 22:43:34,947 - CamsAodPipeline - INFO - 
2025-08-10 22:43:34,949 - CamsAodPipeline - INFO - ===== 开始为 2025-08-10 00:00z 运行周期的 CAMS 预报执行数据流水线 =====
2025-08-10 22:43:34,950 - CamsAodPipeline - INFO - ✅ 原始 CAMS AOD 数据和清单文件已在 'C:\Users\zhang\Documents\Code\chromasky-toolkit\src\data\raw\cams_aod\20250810_t00z' 存在，跳过下载。
2025-08-10 22:43:34,950 - CamsAodPipeline - INFO - 
--------------------------------------------------------------------------------
2025-08-10 22:43:34,951 - CamsAodPipeline - INFO - --- 步骤二: 开始处理原始文件: aod_forecast.nc ---
2025-08-10 22:43:34,960 - CamsAodPipeline - INFO - ✅ 所有数据处理完成，开始可视化...



【诊断】读取到的 manifest_aod.json 文件内容:
{
    "base_run_time_utc": "2025-08-10T00:00:00+00:00",
    "data_file_path": "data\\raw\\cams_aod\\20250810_t00z\\aod_forecast.nc",
    "forecasts": [
        {
            "leadtime_hour": 21,
            "forecast_time_utc": "2025-08-10T21:00:00+00:00",
            "forecast_time_local": "2025-08-11T05:00:00+08:00"
        }
    ]
}
【诊断】从清单中提取到 1 个预报事件。
