# Google Colab：GFS 数据合并 + 追踪 + 环境提取（复用现有代码）

本 Notebook 直接复用仓库中已经实现的逻辑（无需改算法）：

- **下载**：根据 `output/gfs_grib_urls_*.csv` 从公开 NOAA GFS S3 拉取 GRIB2
- **合并**：调用 `src/shared/grib_loader.py` 合并为单个 NetCDF（包含 `msl, 10u, 10v, z`）
- **轨迹**：根据 `input/matched_cyclone_tracks*.csv` 为每个预报周期生成轨迹 CSV
- **追踪与提取**：调用 `src/environment_extractor/pipeline.py::process_nc_files`
- **保存到 Drive**：把 `final_single_output/`、`track_single/` 和运行日志复制到 Google Drive

建议先用 `MAX_TCS=1` 做小规模验证，再跑全量。

In [None]:
# 1) 挂载 Google Drive，并准备仓库路径
from google.colab import drive

drive.mount('/content/drive')

USE_DRIVE_REPO = True  # True：仓库在 Drive；False：clone 到 /content
DRIVE_REPO_PATH = "/content/drive/MyDrive/TianGong-AI-Cyclone"
REPO_URL = "https://github.com/wenjie-shi-alpha/TianGong-AI-Cyclone.git"

PROJECT_PATH = DRIVE_REPO_PATH if USE_DRIVE_REPO else "/content/TianGong-AI-Cyclone"

import os
import subprocess
from pathlib import Path

project = Path(PROJECT_PATH)
if USE_DRIVE_REPO and not project.exists():
    print(f"Drive 中未找到仓库目录: {project}，改为 clone 到 /content")
    USE_DRIVE_REPO = False
    PROJECT_PATH = "/content/TianGong-AI-Cyclone"
    project = Path(PROJECT_PATH)

if not project.exists():
    subprocess.run(["git", "clone", "--depth", "1", REPO_URL, str(project)], check=True)

os.chdir(PROJECT_PATH)
print("PROJECT_PATH:", PROJECT_PATH)
print("CWD:", os.getcwd())
print("Repo files:", sorted(os.listdir('.'))[:20])

In [None]:
# 2) 安装依赖（Colab 环境）
# - cfgrib 依赖 ecCodes（系统库 + python 包）
# - 其余依赖来自 requirements.txt

%%bash
set -e
apt-get update -qq
# 尽量安装 eccodes；不同 Colab 镜像可能包名略有差异，做一个 fallback
apt-get install -y -qq libeccodes0 eccodes || apt-get install -y -qq libeccodes0
python3 -m pip install -q -r requirements.txt

In [None]:
# 3) 设置 PYTHONPATH，确认可导入项目模块
import sys
from pathlib import Path

SRC_PATH = str(Path(PROJECT_PATH) / "src")
if SRC_PATH not in sys.path:
    sys.path.insert(0, SRC_PATH)

from shared.grib_loader import open_grib_collection  # noqa: F401
from environment_extractor.pipeline import process_nc_files  # noqa: F401

print("Import OK. SRC_PATH=", SRC_PATH)

In [None]:
# 4) 配置输入与运行参数
# 你可以根据自己的数据集/范围修改这些路径与参数

from datetime import datetime, timezone

GRIB_URLS_CSV = "output/gfs_grib_urls_00_12_f000_f240_step6.csv"
# 默认使用“剩余待处理”气旋列表（用于断点续跑）
TC_TRACKS_CSV = "input/matched_cyclone_tracks_remaining_from_log.csv"
INITIALS_CSV = "input/western_pacific_typhoons_superfast.csv"

# 先小跑验证：把 MAX_TCS 设为 1；全量运行时设为 None
MAX_TCS = 1

# 追踪/提取并行进程数（环境提取会开子进程）
PROCESSES = 1

# 单个预报周期内，下载 GRIB 的线程数
DOWNLOAD_WORKERS = 8

# 是否保留中间合并得到的 NC 文件（默认 False，会自动删除以节省空间）
KEEP_NC = False

from pathlib import Path

if not Path(TC_TRACKS_CSV).exists():
    fallback = "input/matched_cyclone_tracks_2021onwards.csv"
    if Path(fallback).exists():
        print(f"⚠️ 未找到 {TC_TRACKS_CSV}，自动回退到: {fallback}")
        TC_TRACKS_CSV = fallback
    else:
        raise FileNotFoundError(f"缺少输入文件: {TC_TRACKS_CSV} 或 {fallback}")

for p in [GRIB_URLS_CSV, TC_TRACKS_CSV, INITIALS_CSV]:
    if not Path(p).exists():
        raise FileNotFoundError(f"缺少输入文件: {p}（请确认仓库目录/文件已上传）")

DRIVE_OUTPUT_BASE = Path("/content/drive/MyDrive/TianGong-AI-Cyclone_outputs/gfs")
RUN_TAG = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
RUN_DIR = DRIVE_OUTPUT_BASE / f"run_{RUN_TAG}"
RUN_DIR.mkdir(parents=True, exist_ok=True)

print("RUN_DIR:", RUN_DIR)
print("MAX_TCS:", MAX_TCS, "PROCESSES:", PROCESSES, "DOWNLOAD_WORKERS:", DOWNLOAD_WORKERS, "KEEP_NC:", KEEP_NC)

In [None]:
# 5) 运行：复用现有脚本 src/process_by_tc_lifetime.py
# 输出会写到本地仓库目录（final_single_output/、track_single/），下一步会复制到 Drive。

import subprocess

cmd = [
    "python3",
    "src/process_by_tc_lifetime.py",
    "--grib-urls",
    GRIB_URLS_CSV,
    "--tc-tracks",
    TC_TRACKS_CSV,
    "--initials",
    INITIALS_CSV,
    "--processes",
    str(PROCESSES),
    "--download-workers",
    str(DOWNLOAD_WORKERS),
]
if MAX_TCS is not None:
    cmd += ["--max-tcs", str(MAX_TCS)]
if KEEP_NC:
    cmd += ["--keep-nc"]

log_path = RUN_DIR / "process_by_tc_lifetime.log"
print("CMD:", " ".join(cmd))
print("LOG:", log_path)

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
with log_path.open("w", encoding="utf-8") as fh:
    for line in proc.stdout:
        print(line, end="")
        fh.write(line)

ret = proc.wait()
print("\nExit code:", ret)
if ret != 0:
    raise RuntimeError(f"process_by_tc_lifetime.py 运行失败 (exit={ret})，请查看日志: {log_path}")

In [None]:
# 6) 将结果复制到 Google Drive（建议只保存最终产物 + 轨迹 + 日志）

import shutil

def copy_tree(src: Path, dst: Path) -> None:
    if not src.exists():
        print("Skip missing:", src)
        return
    dst.parent.mkdir(parents=True, exist_ok=True)
    if src.is_dir():
        shutil.copytree(src, dst, dirs_exist_ok=True)
    else:
        shutil.copy2(src, dst)
    print("Saved:", dst)

repo_root = Path(PROJECT_PATH)
copy_tree(repo_root / "final_single_output", RUN_DIR / "final_single_output")
copy_tree(repo_root / "track_single", RUN_DIR / "track_single")
copy_tree(log_path, RUN_DIR / log_path.name)

print("\nDrive output ready:")
print(RUN_DIR)

In [None]:
# 7) （可选）打包一份 zip，方便下载/分享

import shutil

zip_path = shutil.make_archive(str(RUN_DIR), "zip", root_dir=str(RUN_DIR))
print("ZIP:", zip_path)