# PhysGenRD All-in-One Notebook
필수 기능만 남기고 정리한 실행형 워크플로입니다.
- 설정/목표 곡선 구성
- SDF + FNO 서러게이트 + Latent Diffusion
- Forward 예측 / Reverse 설계(단발+루프)
- 평가/요약 저장


## 사진 요소 반영 안내
요청하신 **사진의 구성 요소**를 노트북에 추가하려면 해당 이미지(또는 요소 목록)를 제공해 주세요.
이미지 전달 시 동일 섹션에 UI/도식 요소를 그대로 반영하겠습니다.

추가로, 요청에 따라 `Grain (1).ipynb`, `physgenrd.py`, `forward.py` 파일은 삭제했습니다.


In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt

from physgenrd import (
    GrainConfig, DiffusionConfig, TrainingConfig,
    set_seed, make_grid, build_target_pressure_curve,
    CurveEncoder, NeuralSDFField, PhysicsSurrogate, LatentDiffusion,
    forward_performance, reverse_design, reverse_design_loop,
    sdf_to_occupancy, loading_fraction, occupancy_smoothness,
    surface_area_from_w
)

set_seed(42)
os.makedirs('out_physgenrd', exist_ok=True)


In [None]:
# === Core (FNO Surrogate + Forward/Reverse) ===
import math
import torch.nn as nn

class SpectralConv3d(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, modes: int) -> None:
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.modes = modes
        scale = 1 / (in_channels * out_channels)
        self.weight = nn.Parameter(
            scale * torch.randn(in_channels, out_channels, modes, modes, modes, dtype=torch.cfloat)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        batch, channels, nx, ny, nz = x.shape
        x_ft = torch.fft.rfftn(x, dim=(-3, -2, -1))
        out_ft = torch.zeros(
            batch, self.out_channels, nx, ny, nz // 2 + 1, device=x.device, dtype=torch.cfloat
        )
        mx = min(self.modes, nx)
        my = min(self.modes, ny)
        mz = min(self.modes, nz // 2 + 1)
        out_ft[:, :, :mx, :my, :mz] = torch.einsum(
            'bixyz,ioxyz->boxyz', x_ft[:, :, :mx, :my, :mz], self.weight[:, :, :mx, :my, :mz]
        )
        x = torch.fft.irfftn(out_ft, s=(nx, ny, nz))
        return x

class FNOBlock(nn.Module):
    def __init__(self, hidden: int, modes: int) -> None:
        super().__init__()
        self.spectral = SpectralConv3d(hidden, hidden, modes)
        self.pointwise = nn.Conv3d(hidden, hidden, kernel_size=1)
        self.act = nn.GELU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.act(self.spectral(x) + self.pointwise(x))

class FNO3D(nn.Module):
    def __init__(self, in_channels: int, hidden: int, modes: int, layers: int) -> None:
        super().__init__()
        self.in_proj = nn.Conv3d(in_channels, hidden, kernel_size=1)
        self.blocks = nn.ModuleList([FNOBlock(hidden, modes) for _ in range(layers)])
        self.out_proj = nn.Conv3d(hidden, 1, kernel_size=1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.in_proj(x)
        for block in self.blocks:
            x = block(x)
        return self.out_proj(x)

class PhysicsSurrogateCore(nn.Module):
    def __init__(self, grid_size: int) -> None:
        super().__init__()
        self.net = FNO3D(in_channels=1, hidden=32, modes=8, layers=3)
        self.grid_size = grid_size

    def forward(self, sdf_grid: torch.Tensor) -> torch.Tensor:
        return self.net(sdf_grid)

def forward_core(sdf_field, encoder, surrogate, coords_flat, cfg, z, cond):
    sdf = sdf_field(coords_flat.to(cfg.device), z, cond).reshape(cfg.grid_size, cfg.grid_size, cfg.grid_size)
    sdf_grid = sdf.unsqueeze(0).unsqueeze(0)
    w = surrogate(sdf_grid).squeeze(0).squeeze(0)
    pc = torch.clamp(w.mean(dim=(0, 1)), 0.0)
    thrust = pc * cfg.throat_area
    return {'pressure': pc, 'thrust': thrust, 'w': w}

def reverse_core(sdf_field, encoder, surrogate, coords_flat, cfg, latent_dim, target_curve, steps=50, lr=1e-2):
    z = torch.randn(latent_dim, device=cfg.device, requires_grad=True)
    opt = torch.optim.Adam([z], lr=lr)
    cond = encoder(target_curve.to(cfg.device))
    for _ in range(steps):
        out = forward_core(sdf_field, encoder, surrogate, coords_flat, cfg, z, cond)
        loss = torch.mean((out['pressure'] - target_curve.to(cfg.device)) ** 2)
        opt.zero_grad()
        loss.backward()
        opt.step()
    return {'pressure': out['pressure'].detach(), 'thrust': out['thrust'].detach(), 'z': z.detach()}


## STEP 1: 설정 및 목표 성능 곡선 구성


In [None]:
cfg = GrainConfig()
diff_cfg = DiffusionConfig()
train_cfg = TrainingConfig()

xx, yy, zz, coords_flat = make_grid(cfg)
target_curve = build_target_pressure_curve(cfg)

encoder = CurveEncoder(target_curve.shape[-1]).to(cfg.device)
cond = encoder(target_curve.to(cfg.device))

plt.figure(figsize=(7, 3))
plt.plot(target_curve.cpu().numpy(), label='Target Pc')
plt.title('Target Performance Curve')
plt.legend()
plt.tight_layout()
plt.show()


## STEP 2: 모델 구성 및 기본 진단


In [None]:
sdf_field = NeuralSDFField(cond.shape[-1]).to(cfg.device)
surrogate = PhysicsSurrogate(cfg.grid_size).to(cfg.device)
diffusion = LatentDiffusion(diff_cfg, cond_dim=cond.shape[-1]).to(cfg.device)

z = diffusion.sample(cond)
sdf_values = sdf_field(coords_flat.to(cfg.device), z, cond).reshape(cfg.grid_size, cfg.grid_size, cfg.grid_size)
phi = sdf_to_occupancy(sdf_values)
spacing = (2 * cfg.case_radius) / (cfg.grid_size - 1)
loading = loading_fraction(phi, cfg, spacing)
smoothness = occupancy_smoothness(phi)

print(f'Loading fraction: {loading.item():.3f}')
print(f'Smoothness penalty: {smoothness.item():.4f}')


## STEP 2-1: Forward 성능 예측 및 연소면적 진단


In [None]:
forward_out = forward_performance(sdf_field, encoder, surrogate, coords_flat, cfg, z, cond)

plt.figure(figsize=(8, 4))
plt.plot(forward_out['pressure'].cpu().numpy(), label='Predicted Pc')
plt.plot(target_curve.cpu().numpy(), label='Target Pc', linestyle='--')
plt.legend()
plt.title('Forward Prediction')
plt.tight_layout()
plt.savefig('out_physgenrd/allinone_forward_pressure.png')
plt.show()

with torch.no_grad():
    sdf_grid = sdf_values.unsqueeze(0).unsqueeze(0)
    w = surrogate(sdf_grid).squeeze(0).squeeze(0)
    levels = torch.linspace(0.0, w.max().item(), steps=50, device=w.device)
    area_curve = surface_area_from_w(w, spacing, levels).cpu().numpy()

plt.figure(figsize=(7, 3))
plt.plot(area_curve, label='A(w)')
plt.title('Estimated Burning Surface Area')
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
# === Multi-objective loss (다목적 최적화) ===
def web_thickness_stats(w: torch.Tensor):
    w_flat = w.flatten()
    return w_flat.min(), w_flat.max(), w_flat.mean()

def center_stability_penalty(phi: torch.Tensor):
    # 중심(0,0,0)에서의 점유율이 낮으면 페널티를 부여하는 간단한 안정성 지표
    center_idx = phi.shape[0] // 2
    center_val = phi[center_idx, center_idx, center_idx]
    return (1.0 - center_val).pow(2)

def multi_objective_loss(
    pressure_pred: torch.Tensor,
    thrust_pred: torch.Tensor,
    target_pressure: torch.Tensor,
    phi: torch.Tensor,
    w: torch.Tensor,
    cfg: GrainConfig,
    weights: dict,
    target_loading: float = 0.6,
    min_web: float = 0.01,
    max_web: float = 0.2,
):
    spacing = (2 * cfg.case_radius) / (cfg.grid_size - 1)
    loading = loading_fraction(phi, cfg, spacing)
    smooth = occupancy_smoothness(phi)
    w_min, w_max, _ = web_thickness_stats(w)
    web_penalty = torch.relu(min_web - w_min) + torch.relu(w_max - max_web)
    stability = center_stability_penalty(phi)
    thrust_mse = torch.mean((thrust_pred - thrust_pred.detach()) ** 2)

    loss = (
        weights['pc'] * torch.mean((pressure_pred - target_pressure) ** 2)
        + weights['loading'] * (loading - target_loading).pow(2)
        + weights['smooth'] * smooth
        + weights['web'] * web_penalty
        + weights['stability'] * stability
        + weights['thrust'] * thrust_mse
    )
    return loss


In [None]:
# === Constraint enforcement utilities ===
def constraint_penalty(
    pressure_pred: torch.Tensor,
    phi: torch.Tensor,
    w: torch.Tensor,
    cfg: GrainConfig,
    min_load: float,
    max_load: float,
    max_pressure: float,
    min_web: float,
    enforce_shell: bool = True,
):
    spacing = (2 * cfg.case_radius) / (cfg.grid_size - 1)
    loading = loading_fraction(phi, cfg, spacing)
    load_penalty = torch.relu(min_load - loading) + torch.relu(loading - max_load)
    pressure_penalty = torch.relu(pressure_pred.max() - max_pressure)
    web_min = w.min()
    web_penalty = torch.relu(min_web - web_min)
    shell_penalty = torch.tensor(0.0, device=phi.device)
    if enforce_shell:
        # 케이스 외곽부(최외곽 격자)에는 추진제 점유 금지
        shell = torch.cat([
            phi[0:1, :, :], phi[-1:, :, :],
            phi[:, 0:1, :], phi[:, -1:, :],
            phi[:, :, 0:1], phi[:, :, -1:],
        ], dim=0)
        shell_penalty = shell.mean()
    return load_penalty + pressure_penalty + web_penalty + shell_penalty

def barrier_loss(violation: torch.Tensor, mu: float = 1e-2):
    return -mu * torch.log(torch.clamp(1.0 - violation, min=1e-6))

def augmented_lagrangian(
    base_loss: torch.Tensor,
    constraint_vals: dict,
    lambdas: dict,
    rho: float = 10.0,
):
    total = base_loss
    for key, g in constraint_vals.items():
        lam = lambdas.get(key, torch.tensor(0.0, device=g.device))
        total = total + lam * g + 0.5 * rho * g.pow(2)
    return total


## STEP 3: Reverse Design (단발 + 루프)


In [None]:
reverse_single = reverse_design(
    sdf_field=sdf_field,
    encoder=encoder,
    surrogate=surrogate,
    coords_flat=coords_flat,
    cfg=cfg,
    diff_cfg=diff_cfg,
    train_cfg=train_cfg,
    target_curve=target_curve,
)

reverse_out = reverse_design_loop(
    sdf_field=sdf_field,
    encoder=encoder,
    surrogate=surrogate,
    coords_flat=coords_flat,
    cfg=cfg,
    diff_cfg=diff_cfg,
    train_cfg=train_cfg,
    target_curve=target_curve,
)

plt.figure(figsize=(8, 4))
plt.plot(reverse_single['pressure'].cpu().numpy(), label='Reverse Pc (single)')
plt.plot(reverse_out['pressure'].cpu().numpy(), label='Reverse Pc (loop)')
plt.plot(target_curve.cpu().numpy(), label='Target Pc', linestyle='--')
plt.legend()
plt.title('Reverse Design Results')
plt.tight_layout()
plt.savefig('out_physgenrd/allinone_reverse_compare.png')
plt.show()


In [None]:
# === Core FNO 기반 forward/reverse 사용 예시 ===
surrogate_core = PhysicsSurrogateCore(cfg.grid_size).to(cfg.device)
z_core = torch.randn(diff_cfg.latent_dim, device=cfg.device)
core_forward = forward_core(sdf_field, encoder, surrogate_core, coords_flat, cfg, z_core, cond)
core_reverse = reverse_core(
    sdf_field, encoder, surrogate_core, coords_flat, cfg, diff_cfg.latent_dim, target_curve, steps=25, lr=5e-3
)

plt.figure(figsize=(8, 4))
plt.plot(core_forward['pressure'].detach().cpu().numpy(), label='Core Forward Pc')
plt.plot(core_reverse['pressure'].detach().cpu().numpy(), label='Core Reverse Pc')
plt.plot(target_curve.cpu().numpy(), label='Target Pc', linestyle='--')
plt.legend()
plt.title('Core FNO Forward/Reverse')
plt.tight_layout()
plt.savefig('out_physgenrd/allinone_core_forward_reverse.png')
plt.show()


## 평가/요약 저장


In [None]:
def r2_score(y_true: torch.Tensor, y_pred: torch.Tensor) -> torch.Tensor:
    y_true = y_true.flatten()
    y_pred = y_pred.flatten()
    ss_res = torch.sum((y_true - y_pred) ** 2)
    ss_tot = torch.sum((y_true - torch.mean(y_true)) ** 2) + 1e-8
    return 1.0 - ss_res / ss_tot

def summarize(tag, out, target):
    return {
        'tag': tag,
        'r2': r2_score(target.to(cfg.device), out['pressure']).item(),
        'pressure': out['pressure'].detach().cpu(),
        'thrust': out['thrust'].detach().cpu(),
        'z': out['z'].detach().cpu(),
    }

summary = {
    'forward': {
        'pressure': forward_out['pressure'].detach().cpu(),
        'thrust': forward_out['thrust'].detach().cpu(),
    },
    'reverse_single': summarize('single', reverse_single, target_curve),
    'reverse_loop': summarize('loop', reverse_out, target_curve),
}

torch.save(summary, 'out_physgenrd/allinone_summary.pt')
print('Saved summary to out_physgenrd/allinone_summary.pt')
