## VideoMetamaterials: 复现原始实验 + 加入 multi-scale consistency loss 对比（Colab）

本 notebook 用于在 **Google Colab (单卡 GPU)** 上：

- **下载并使用论文提供的同一数据集与预训练 checkpoint**（来自 Zenodo / ETHZ Research Collection）
- **复现 baseline**（`lambda_phys = 0`）：加载预训练模型，按论文方式生成条件采样视频
- **加入 physics consistency loss**（`lambda_phys > 0`）：在相同数据集上继续训练（fine-tune 或从头训练），并生成同样的条件采样视频
- **对比评估**：
  - 多尺度自洽指标（RG-style）：对生成视频做空间 coarse-graining，比较各尺度的统计表征差异
  - 可视化对比：输出 GIF 网格与 topologies

> 注意：论文的最终“物理真实性”评估需要 Abaqus 仿真。Colab 环境不包含 Abaqus，因此本 notebook 的对比侧重于 **生成质量 + 多尺度自洽性**（即你引入的 consistency loss 的直接目标）。



In [None]:
# === 0) 环境检查 ===
import os, sys, json, textwrap, subprocess, re
import torch

print('python:', sys.version)
print('torch:', torch.__version__)
print('cuda available:', torch.cuda.is_available())
if torch.cuda.is_available():
    print('gpu:', torch.cuda.get_device_name(0))
    print('capability:', torch.cuda.get_device_capability(0))



In [None]:
# === 1) 安装依赖（按 README 的最小集合）===
# Colab 自带 torch，通常无需降级；其余按需安装
!pip -q install einops==0.6.1 einops-exts==0.0.4 rotary-embedding-torch==0.2.3 accelerate==0.19.0 imageio==2.28.1 tqdm==4.65.0 networkx==2.8.4 matplotlib==3.8.0 pillow==10.0.1 pyyaml

# 可选：wandb
# !pip -q install wandb==0.15.2



In [None]:
# === 2) 进入项目目录（不使用 notebook magic，保证可复现）===
import subprocess, pathlib

REPO_DIR = '/content/VideoMetamaterials-main'

if not os.path.exists(REPO_DIR):
    subprocess.run(['git', 'clone', '--quiet', 'https://github.com/jhbastek/VideoMetamaterials.git'], check=True)
    # 原仓库目录一般叫 /content/VideoMetamaterials
    if os.path.exists('/content/VideoMetamaterials') and not os.path.exists(REPO_DIR):
        pathlib.Path(REPO_DIR).symlink_to('/content/VideoMetamaterials')

os.chdir(REPO_DIR)
print('cwd:', os.getcwd())



### 3) 下载同一数据集与预训练模型（Zenodo API 自动下载）

论文作者在 README 里给了数据与 checkpoint 的下载入口（ETHZ Research Collection / Zenodo）。这里用 **Zenodo API** 自动拉取并解压到正确目录结构：

- `data/lagrangian.zip` → 解压到 `data/lagrangian/`
- `runs/pretrained.zip` → 解压到 `runs/pretrained/`

如果网络无法访问 Zenodo，你也可以在 Colab 左侧手动上传 `lagrangian.zip` 和 `pretrained.zip`，然后运行同一段解压代码。



In [None]:
import requests, zipfile
from pathlib import Path

ZENODO_RECORD_ID = 10011767  # 来自 README 的 Zenodo DOI: 10.5281/zenodo.10011767

DATA_ZIP = Path('data/lagrangian.zip')
RUNS_ZIP = Path('runs/pretrained.zip')

DATA_ZIP.parent.mkdir(parents=True, exist_ok=True)
RUNS_ZIP.parent.mkdir(parents=True, exist_ok=True)


def zenodo_download(record_id: int, filename: str, out_path: Path):
    url = f'https://zenodo.org/api/records/{record_id}'
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    j = r.json()
    files = j.get('files', [])
    match = None
    for f in files:
        if f.get('key') == filename:
            match = f
            break
    if match is None:
        raise RuntimeError(f'Zenodo record {record_id} does not contain {filename}. Found: {[f.get("key") for f in files]}')

    dl_url = match['links']['self']  # direct file endpoint
    print('Downloading:', dl_url)
    with requests.get(dl_url, stream=True, timeout=60) as resp:
        resp.raise_for_status()
        total = int(resp.headers.get('content-length', 0))
        chunk = 1024 * 1024
        out_path_tmp = out_path.with_suffix(out_path.suffix + '.part')
        written = 0
        with open(out_path_tmp, 'wb') as f:
            for b in resp.iter_content(chunk_size=chunk):
                if not b:
                    continue
                f.write(b)
                written += len(b)
                if total:
                    print(f'  {written/total:.1%}', end='\r')
        out_path_tmp.replace(out_path)
    print('Saved:', out_path)


def unzip_to(zip_path: Path, target_dir: Path):
    print('Unzipping', zip_path, '->', target_dir)
    target_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as z:
        z.extractall(target_dir)


# 自动下载（如文件不存在）
try:
    if not DATA_ZIP.exists():
        zenodo_download(ZENODO_RECORD_ID, 'lagrangian.zip', DATA_ZIP)
    if not RUNS_ZIP.exists():
        zenodo_download(ZENODO_RECORD_ID, 'pretrained.zip', RUNS_ZIP)
except Exception as e:
    print('自动下载失败：', repr(e))
    print('请在 Colab 左侧手动上传 lagrangian.zip 与 pretrained.zip 到 data/ 与 runs/ 目录，再继续运行下面的解压单元。')


# 解压到正确位置
if DATA_ZIP.exists() and not Path('data/lagrangian').exists():
    unzip_to(DATA_ZIP, Path('data'))

if RUNS_ZIP.exists() and not Path('runs/pretrained').exists():
    unzip_to(RUNS_ZIP, Path('runs'))

print('data/ contains:', sorted([p.name for p in Path('data').iterdir()]))
print('runs/ contains:', sorted([p.name for p in Path('runs').iterdir()]))



### 4) Baseline：加载论文预训练模型并生成条件采样视频

这一步复现 README 的行为：用 `data/target_responses.csv` 里的目标应力-应变曲线做 conditioning，生成视频并保存到 `runs/pretrained/eval_target_w_<w>_*/step_<step>/`。

为避免 Colab 下分布式初始化的问题，我们不调用仓库里的 `main.py`，而是在 notebook 里用同样的方式构建 `Unet3D / GaussianDiffusion / Trainer` 并加载 checkpoint。



In [None]:
# === 4.1) 加载配置 & 构建模型/Trainer（单卡 Colab，无需 torch.distributed）===
import yaml
from accelerate import Accelerator

from denoising_diffusion_pytorch import Unet3D, GaussianDiffusion, Trainer


def load_yaml(path: str):
    with open(path, 'r') as f:
        return yaml.safe_load(f)


def build_accelerator(fp16: bool = True):
    return Accelerator(mixed_precision='fp16' if fp16 else 'no')


def build_model_and_trainer(config: dict, run_dir: str, log: bool = False, fp16: bool = True):
    accelerator = build_accelerator(fp16=fp16)

    model = Unet3D(
        dim = config['unet_dim'],
        dim_mults = (1, 2, 4, 8),
        channels = len(config['selected_channels']),
        attn_heads = config['unet_attn_heads'],
        attn_dim_head = config['unet_attn_dim_head'],
        init_dim = None,
        init_kernel_size = 7,
        use_sparse_linear_attn = config['unet_use_sparse_linear_attn'],
        resnet_groups = config['unet_resnet_groups'],
        cond_bias = True,
        cond_attention = config['unet_cond_attention'],
        cond_attention_tokens = config['unet_cond_attention_tokens'],
        cond_att_GRU = config['unet_cond_att_GRU'],
        use_temporal_attention_cond = config['unet_temporal_att_cond'],
        cond_to_time = config['unet_cond_to_time'],
        per_frame_cond = config['per_frame_cond'],
        padding_mode = config['padding_mode'],
    )

    diffusion = GaussianDiffusion(
        model,
        image_size = 96,
        channels = len(config['selected_channels']),
        num_frames = 11,
        timesteps = config['train_timesteps'],
        loss_type = 'l1',
        use_dynamic_thres = config['use_dynamic_thres'],
        sampling_timesteps = config['sampling_timesteps'],
        # physics consistency (我们在 fork 版里加的参数；原版没有也不会报错，因为我们只在本 repo 使用)
        lambda_phys = float(config.get('lambda_phys', 0.0)),
        phys_num_levels = int(config.get('phys_num_levels', 2)),
    )

    data_dir = f"data/{config['reference_frame']}/training/"
    data_dir_validation = f"data/{config['reference_frame']}/validation/"

    trainer = Trainer(
        diffusion,
        folder = data_dir,
        validation_folder = data_dir_validation,
        results_folder = run_dir,
        selected_channels = config['selected_channels'],
        train_batch_size = config['batch_size'],
        test_batch_size = config['batch_size'],
        train_lr = config['learning_rate'],
        save_and_sample_every = 10000,
        train_num_steps = 200000,
        ema_decay = 0.995,
        log = log,
        null_cond_prob = 0.1,
        per_frame_cond = config['per_frame_cond'],
        reference_frame = config['reference_frame'],
        run_name = os.path.basename(os.path.normpath(run_dir)),
        accelerator = accelerator,
        wandb_username = None,
    )

    return accelerator, trainer


def find_latest_checkpoint(run_dir: str):
    import glob
    ckpts = sorted(glob.glob(os.path.join(run_dir, 'model', 'step_*', 'checkpoint.pt')))
    if not ckpts:
        raise FileNotFoundError(f'No checkpoints found under {run_dir}/model/step_*/checkpoint.pt')
    # pick highest step
    def step_of(p):
        m = re.search(r'step_(\d+)', p)
        return int(m.group(1)) if m else -1
    ckpts = sorted(ckpts, key=step_of)
    return ckpts[-1], step_of(ckpts[-1])


# baseline run 目录（来自 pretrained.zip）
BASE_RUN_DIR = 'runs/pretrained/'
BASE_CONFIG_PATH = os.path.join(BASE_RUN_DIR, 'model', 'model.yaml')
base_config = load_yaml(BASE_CONFIG_PATH)
print('Loaded base config from:', BASE_CONFIG_PATH)
print('base_config keys:', list(base_config.keys()))



In [None]:
# === 4.2) Baseline：加载 checkpoint + 生成 samples ===
TARGET_LABELS = 'data/target_responses.csv'
GUIDANCE_SCALE = 5.0
NUM_PREDS_PER_COND = 1  # 每条目标曲线生成几个样本

acc_base, trainer_base = build_model_and_trainer(base_config, BASE_RUN_DIR, log=False, fp16=True)

ckpt_path, ckpt_step = find_latest_checkpoint(BASE_RUN_DIR)
print('Baseline checkpoint:', ckpt_path, 'step=', ckpt_step)

# Trainer.load() 依赖 trainer.step
trainer_base.step = ckpt_step
trainer_base.load(strict=True)

# 生成论文同样的 conditioning samples（保存到 runs/pretrained/eval_target_w_xxx*/step_xxx/）
trainer_base.eval_target(TARGET_LABELS, guidance_scale=GUIDANCE_SCALE, num_preds=NUM_PREDS_PER_COND)
print('Baseline sampling done.')



### 5) Consistency 实验：相同数据集上 fine-tune（`lambda_phys > 0`）

我们在相同训练集 `data/lagrangian/training/` 上，从论文预训练 checkpoint 出发进行短暂 fine-tune，使模型显式最小化你加入的 multi-scale consistency loss。

- 你可以把 `LAMBDA_PHYS` 从 `1e-4` / `1e-3` 开始试。
- `FINETUNE_STEPS` 默认只跑少量步（Colab 可承受），用于对比趋势；想完全复现实验可把步数加大。



In [None]:
# === 5.1) 构建 consistency run 并从 baseline checkpoint 开始 fine-tune ===
import shutil

CONS_RUN_DIR = 'runs/consistency_finetune/'
os.makedirs(os.path.join(CONS_RUN_DIR, 'model'), exist_ok=True)
os.makedirs(os.path.join(CONS_RUN_DIR, 'training'), exist_ok=True)

LAMBDA_PHYS = 1e-4
PHYS_LEVELS = 2
FINETUNE_STEPS = 5000

cons_config = dict(base_config)
cons_config['lambda_phys'] = float(LAMBDA_PHYS)
cons_config['phys_num_levels'] = int(PHYS_LEVELS)

# 保存一份 config，方便复现
with open(os.path.join(CONS_RUN_DIR, 'model', 'model.yaml'), 'w') as f:
    yaml.safe_dump(cons_config, f)

acc_cons, trainer_cons = build_model_and_trainer(cons_config, CONS_RUN_DIR, log=False, fp16=True)

# 从 baseline checkpoint 加载
trainer_cons.step = ckpt_step
trainer_cons.load(strict=True)

# 将 train_num_steps 临时设成 finetune 步数（避免跑满 200k）
trainer_cons.train_num_steps = trainer_cons.step + FINETUNE_STEPS

print('Start finetune: lambda_phys=', LAMBDA_PHYS, 'steps=', FINETUNE_STEPS)
trainer_cons.train(load_model_step=ckpt_step, num_samples=1, num_preds=1)
print('Finetune done.')



In [None]:
# === 5.2) Consistency 模型：生成同样的 target samples ===
# 训练结束后 trainer_cons.step 会停在最后一步
cons_step = trainer_cons.step
print('Consistency finetuned step:', cons_step)
trainer_cons.eval_target(TARGET_LABELS, guidance_scale=GUIDANCE_SCALE, num_preds=NUM_PREDS_PER_COND)
print('Consistency sampling done.')



### 6) 对比评估：多尺度自洽性（RG-style）

我们对两种模型生成的视频，计算一个简单但直接的“多尺度差异”指标：

1. 对生成视频做空间 coarse-graining（2×2 平均池化，重复多次）得到不同尺度的视频；
2. 每个尺度上取空间平均得到时序特征 \(z^{(l)}_t\in\mathbb{R}^{C}\)；
3. 计算相邻尺度差异
\[
D_l = \frac{1}{T}\sum_t \lVert z^{(l)}_t - z^{(l+1)}_t \rVert_2.
\]

期望：加入一致性 loss 后，\(D_l\) 显著更小。



In [None]:
import glob
import torch.nn.functional as F
import matplotlib.pyplot as plt


def latest_eval_step_dir(run_dir: str, step: int, guidance_scale: float):
    # eval_target 的输出目录形如：runs/<run>/eval_target_w_<w>_<idx>/step_<step>/
    pattern = os.path.join(run_dir, f'eval_target_w_{guidance_scale}_*', f'step_{step}')
    candidates = sorted(glob.glob(pattern))
    if not candidates:
        raise FileNotFoundError(f'No eval_target outputs found for pattern: {pattern}')
    return candidates[-1]


def load_pred_grid_tensor(step_dir: str, pred_channel: int):
    # save_preds 存的是一个拼成网格的 GIF，文件名 prediction_channel_<ch>.gif
    gif_path = os.path.join(step_dir, 'gifs', f'prediction_channel_{pred_channel}.gif')
    from denoising_diffusion_pytorch.video_denoising_diffusion_pytorch import gif_to_tensor
    t = gif_to_tensor(gif_path, channels=1)  # (1, f, H, W)
    return t


def compute_multiscale_D(video: torch.Tensor, levels: int = 2):
    """video: (c, f, h, w) tensor. Return list [D0, D1, ...]."""
    # reshape to (1,c,f,h,w)
    x = video.unsqueeze(0)
    pool = torch.nn.AvgPool3d(kernel_size=(1,2,2), stride=(1,2,2))
    zs = []
    for l in range(levels + 1):
        z = x.mean(dim=(-1, -2))  # (1,c,f)
        zs.append(z)
        if l < levels:
            x = pool(x)
    Ds = []
    for l in range(levels):
        d = torch.sqrt(((zs[l] - zs[l+1]) ** 2).mean()).item()
        Ds.append(d)
    return Ds


# 选择一个通道用于一致性评估（建议用位移 u_1=0 或 u_2=1）
EVAL_CH = int(base_config['selected_channels'][0])
levels = int(cons_config.get('phys_num_levels', 2))

base_eval_dir = latest_eval_step_dir(BASE_RUN_DIR, ckpt_step, GUIDANCE_SCALE)
cons_eval_dir = latest_eval_step_dir(CONS_RUN_DIR, cons_step, GUIDANCE_SCALE)
print('Baseline eval dir:', base_eval_dir)
print('Consistency eval dir:', cons_eval_dir)

base_vid = load_pred_grid_tensor(base_eval_dir, EVAL_CH)  # (1,f,H,W)
cons_vid = load_pred_grid_tensor(cons_eval_dir, EVAL_CH)

base_D = compute_multiscale_D(base_vid, levels=levels)
cons_D = compute_multiscale_D(cons_vid, levels=levels)

print('Baseline D:', base_D)
print('Consistency D:', cons_D)

# 画柱状图对比
x = list(range(levels))
plt.figure(figsize=(6,4))
plt.bar([i-0.15 for i in x], base_D, width=0.3, label='baseline (lambda=0)')
plt.bar([i+0.15 for i in x], cons_D, width=0.3, label=f'consistency (lambda={LAMBDA_PHYS})')
plt.xticks(x, [f'D{i}' for i in x])
plt.ylabel('multi-scale difference (lower is better)')
plt.title(f'Multi-scale consistency on generated GIF grid (channel {EVAL_CH})')
plt.legend()
plt.grid(alpha=0.3)
plt.show()



### 7) 查看输出文件

- Baseline 输出：`runs/pretrained/eval_target_w_<w>_<idx>/step_<step>/gifs/`
- Consistency 输出：`runs/consistency_finetune/eval_target_w_<w>_<idx>/step_<step>/gifs/`

你可以直接在左侧文件树里点开 GIF；或者用下面的 cell 直接在 notebook 内显示。



In [None]:
from IPython.display import Image, display

# 展示一个通道的 GIF（网格）
base_step_dir = latest_eval_step_dir(BASE_RUN_DIR, ckpt_step, GUIDANCE_SCALE)
cons_step_dir = latest_eval_step_dir(CONS_RUN_DIR, cons_step, GUIDANCE_SCALE)

base_gif = os.path.join(base_step_dir, 'gifs', f'prediction_channel_{EVAL_CH}.gif')
cons_gif = os.path.join(cons_step_dir, 'gifs', f'prediction_channel_{EVAL_CH}.gif')

print('Baseline gif:', base_gif)
display(Image(filename=base_gif))

print('Consistency gif:', cons_gif)
display(Image(filename=cons_gif))

