
# Template: PINN + Curvas de CFD (genérico e seguro)

> **Objetivo:** fornecer uma **pipeline genérica** para integrar **curvas de CFD** externas (por ex., `condição × coeficientes`) em um **modelo físico‑informado** (PINN) com **varredura de parâmetros** e **otimização multiobjetivo**.  
> **Uso previsto:** aplicações **civis/educacionais** (p. ex., perfis aerodinâmicos genéricos, pás de turbina, geometrias de dutos).  
>
> ⚠️ **Não use este caderno para armamentos.** O conteúdo é estritamente educacional em domínios inofensivos.



## 1) Setup do ambiente

- Requer Python 3.9+ e PyTorch 2.x
- Instalação sugerida:
```bash
conda create -n pinn-cfd python=3.10 -y
conda activate pinn-cfd
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
pip install numpy pandas matplotlib tqdm pydantic
```


In [None]:
# Imports
import os, json, math, random, time, pathlib
from dataclasses import dataclass
from typing import Dict, Tuple, List, Optional

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.autograd as autograd

try:
    from tqdm import trange
    _HAS_TQDM = True
except Exception:
    _HAS_TQDM = False

print('Torch:', torch.__version__)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device



## 2) Entrada de **curvas de CFD** (externas)

Esperado um ou mais CSVs (ou DataFrames) com colunas como:
- `condicao` (ex.: número de Mach, Reynolds, ângulo de ataque etc.)
- `Cd` (coeficiente de arrasto) — opcional
- `Cl` (coeficiente de sustentação) — opcional
- `Cm` (coeficiente de momento) — opcional

Adapte os nomes conforme suas curvas. O código abaixo mostra **exemplos de leitura**.


In [None]:
# === Exemplo: leitura de CSV ===
# Substitua 'path/to/...' pelos seus arquivos. Você pode carregar múltiplos conjuntos.
# Se estiver no Jupyter, arraste o CSV para a pasta de trabalho e atualize o caminho.
paths = {
    'dataset_1': 'path/to/curvas_cfd_1.csv',  # <- troque
    # 'dataset_2': 'path/to/curvas_cfd_2.csv',
}

def load_cfd_curves(paths_dict):
    dfs = {}
    for name, p in paths_dict.items():
        if not os.path.exists(p):
            print(f"[AVISO] Arquivo não encontrado: {p}. Pule ou atualize o caminho.")
            continue
        df = pd.read_csv(p)
        dfs[name] = df
    return dfs

cfd_data = load_cfd_curves(paths)
list(cfd_data.keys())



## 3) Pré‑processamento e normalização

- Seleciona colunas de interesse e normaliza para melhorar o treinamento.
- Se houver múltiplos datasets (regiões de operação), eles serão concatenados com uma tag de origem.


In [None]:
# Escolha as colunas que você realmente tem:
feature_cols = ['condicao']            # ex.: ['Mach'] ou ['Re', 'AoA']
target_cols  = ['Cd', 'Cl', 'Cm']      # deixe só o que tiver, e.g. ['Cd']

def build_dataset(cfd_dict, feature_cols, target_cols):
    frames = []
    for name, df in cfd_dict.items():
        # Filtra colunas disponíveis
        keep = [c for c in feature_cols + target_cols if c in df.columns]
        sub = df[keep].copy()
        sub['__src__'] = name
        frames.append(sub)
    if not frames:
        raise RuntimeError("Nenhum dataset carregado. Atualize os caminhos/colunas.")
    full = pd.concat(frames, ignore_index=True)
    # Normalização simples [0,1] por coluna
    stats = {}
    for c in feature_cols + target_cols:
        if c in full.columns:
            cmin, cmax = float(full[c].min()), float(full[c].max())
            if cmax == cmin:
                cmax = cmin + 1e-6
            stats[c] = {'min': cmin, 'max': cmax}
            full[c + '_norm'] = (full[c] - cmin) / (cmax - cmin)
    return full, stats

if cfd_data:
    df_all, norm_stats = build_dataset(cfd_data, feature_cols, target_cols)
    display(df_all.head())
    print(json.dumps(norm_stats, indent=2))
else:
    df_all, norm_stats = None, None



## 4) Parâmetros de **projeto genéricos** (geometria)

Defina um **vetor de projeto** \(\mathbf{p}\) com limites inferiores/superiores.  
> *Exemplos civis*: parâmetros de um perfil aerodinâmico genérico (espessura, camber, razão de aspecto), canal de escoamento etc.


In [None]:
# Exemplo seguro de parâmetros de projeto:
design_space = {
    'espessura':  (0.05, 0.20),   # valores adimensionais [min, max]
    'camber':     (0.00, 0.10),
    'razao_aspecto': (2.0, 8.0),
}

def sample_design(n=1):
    xs = []
    for _ in range(n):
        p = {k: np.random.uniform(v[0], v[1]) for k, v in design_space.items()}
        xs.append(p)
    return xs

sample_design(3)



## 5) Modelo PINN (surrogate físico‑informado)

- Entrada: **(parâmetros de projeto, condição de operação)** normalizados  
- Saída: **coeficientes** (ex.: `Cd`, `Cl`, `Cm`) normalizados
- *Loss* = **dados (curvas CFD)** + **termo físico** (restrições genéricas) + **regularizações**


In [None]:
class MLP(nn.Module):
    def __init__(self, in_dim, out_dim, width=256, depth=6):
        super().__init__()
        layers = []
        dims = [in_dim] + [width]*depth + [out_dim]
        for i in range(len(dims)-2):
            layers += [nn.Linear(dims[i], dims[i+1]), nn.Tanh()]
        layers += [nn.Linear(dims[-2], dims[-1])]
        self.net = nn.Sequential(*layers)
        # Xavier init
        for m in self.net:
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight); nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.net(x)

# Helper: normalização/denormalização
def norm(x, stats):  # x: dict
    out = {}
    for k, v in x.items():
        mn, mx = stats[k]['min'], stats[k]['max']
        out[k+'_norm'] = (v - mn)/(mx - mn + 1e-12)
    return out

def denorm_vec(y, cols, stats):
    y = y.clone().detach().cpu().numpy()
    out = {}
    for i, c in enumerate(cols):
        if c in stats:
            mn, mx = stats[c]['min'], stats[c]['max']
            out[c] = y[:, i]*(mx - mn) + mn
    return out



### 5.1) **Termo físico genérico** (placeholders)

Como o termo físico depende do problema seguro escolhido, deixamos **ganchos** exemplificativos:

- **Suavidade** das curvas (penaliza derivadas muito grandes).
- **Monotonicidade** opcional (p.ex., `Cd` crescente em determinada faixa).
- **Limites físicos** (coeficientes em intervalos plausíveis).

Adapte as funções abaixo ao seu domínio **não bélico**.


In [None]:
def physics_penalty(outputs, inputs, cols_out, lam_smooth=1e-3, lam_bounds=1e-3, bounds=None):
    """ 
    outputs: tensor (N, O) normalizado
    inputs:  tensor (N, D)  normalizado (últimas colunas podem ser condições)
    cols_out: lista com nomes dos targets na mesma ordem da rede
    bounds: dict opcional com limites *desnormalizados* por variável
    """
    loss = torch.tensor(0.0, device=outputs.device)
    # Suavidade via variação total aproximada
    diff = outputs[1:] - outputs[:-1]
    loss = loss + lam_smooth * torch.mean(diff**2)

    # Limites físicos (se fornecidos)
    if bounds is not None:
        for j, name in enumerate(cols_out):
            if name in bounds:
                lo, hi = bounds[name]
                # Converte limites p/ espaço normalizado ~ heurístico
                # Se stats não estiver disponível aqui, aplique bounds no espaço real externamente.
                # Aqui mantemos um "clamp loss" simples como placeholder.
                loss = loss + lam_bounds * torch.mean(torch.relu(outputs[:, j] - 1.2))  # >1 (~fora do [0,1])
                loss = loss + lam_bounds * torch.mean(torch.relu(0.0 - outputs[:, j]))  # <0
    return loss



## 6) Montagem do dataset e treino

- **X** = concatenação de **parâmetros de projeto** (amostrados) + **condições** das curvas de CFD.  
- **Y** = coeficientes alvos (a partir das curvas).  
- Treinaremos um **surrogate físico‑informado** para prever os coeficientes em novas combinações (projeto, condição).


In [None]:
# Preparação dos tensores (exemplo)
if df_all is not None:
    # Features = [design params (amostrados)] + [condições do CFD]
    n_design = 256  # quantidade de amostras de projeto para combinar
    designs = sample_design(n_design)

    # Expande condições do CFD para cada design
    feats = []
    tgts  = []
    src_cols = [c for c in feature_cols if c+'_norm' in df_all.columns]
    out_cols = [c for c in target_cols  if c+'_norm' in df_all.columns]

    for p in designs:
        # Normaliza projeto em [0,1] com limites do design_space
        p_norm = {k+'_norm': (p[k]-design_space[k][0])/(design_space[k][1]-design_space[k][0]) for k in design_space}
        P = pd.DataFrame([p_norm]*len(df_all))

        feats.append(pd.concat([P.reset_index(drop=True), df_all[[c+'_norm' for c in src_cols]].reset_index(drop=True)], axis=1))
        tgts.append(df_all[[c+'_norm' for c in out_cols]].reset_index(drop=True))

    X_df = pd.concat(feats, ignore_index=True)
    Y_df = pd.concat(tgts,  ignore_index=True)

    X = torch.tensor(X_df.values, dtype=torch.float32, device=device)
    Y = torch.tensor(Y_df.values, dtype=torch.float32, device=device)
    print('X shape:', X.shape, '| Y shape:', Y.shape)
else:
    X = Y = None


In [None]:
# Treino (Adam) com termo de física genérico
def train_surrogate(X, Y, in_dim, out_dim, epochs=20000, lr=1e-3, width=256, depth=6, log_every=1000):
    model = MLP(in_dim, out_dim, width=width, depth=depth).to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)

    hist = []
    iterator = trange(epochs, desc='Treino', leave=False) if _HAS_TQDM else range(epochs)
    for it in iterator:
        opt.zero_grad()
        pred = model(X)
        loss_data = torch.mean((pred - Y)**2)
        loss_phys = physics_penalty(pred, X, cols_out=[c for c in Y_df.columns])
        loss = loss_data + loss_phys
        loss.backward()
        opt.step()

        if (it+1) % log_every == 0 or it == 0 or it == epochs-1:
            hist.append({'iter': it+1, 'loss': float(loss.detach().cpu()),
                         'data': float(loss_data.detach().cpu()),
                         'phys': float(loss_phys.detach().cpu())})
            if not _HAS_TQDM:
                print(hist[-1])
    return model, hist

if X is not None:
    model, history = train_surrogate(X, Y, in_dim=X.shape[1], out_dim=Y.shape[1], epochs=20000, lr=1e-3)



## 7) Função objetivo (multiobjetivo → escalar)

Defina **o que otimizar** (aplicação segura). Ex.: minimizar `Cd` médio e manter `Cl` próximo de um alvo; penalizar `Cm` fora de faixa.  
Use uma **agregação escalar** (soma ponderada) ou um algoritmo multiobjetivo (ex.: NSGA‑II). Abaixo, mostramos a abordagem simples.


In [None]:
# Exemplo de objetivo seguro (ajuste conforme seu caso)
def objective_scalar(pred_metrics, w_cd=1.0, w_cl=0.1, cl_target=0.0, w_cm=0.1):
    # pred_metrics: dict com arrays (em espaço real) para chaves presentes
    score = 0.0
    if 'Cd' in pred_metrics:
        score += w_cd * float(np.mean(pred_metrics['Cd']))
    if 'Cl' in pred_metrics:
        score += w_cl * float(np.mean((pred_metrics['Cl'] - cl_target)**2))
    if 'Cm' in pred_metrics:
        score += w_cm * float(np.mean(np.maximum(0.0, np.abs(pred_metrics['Cm']) - 0.1)))  # faixa de estabilidade genérica
    return score



## 8) Varredura de projetos (até 1000 modelos/rodadas)

- **Duas camadas** de varredura possíveis:
  1. **Hiperparâmetros** do surrogate (largura, profundidade, LR) e/ou pesos de loss.
  2. **Exploração do espaço de projeto** (amostrar \(\mathbf{p}\), avaliar com o surrogate, calcular objetivo).

Abaixo implementamos a **camada 2** (busca no espaço de projeto). A camada 1 segue o mesmo padrão.


In [None]:
def evaluate_designs(model, n_candidates=1000):
    cand = sample_design(n_candidates)
    results = []
    for p in cand:
        # Monta grid de condições (das suas curvas)
        cond_df = df_all[[c for c in df_all.columns if c.endswith('_norm') and c.replace('_norm','') in feature_cols]]
        # Normaliza o design
        p_norm = {k+'_norm': (p[k]-design_space[k][0])/(design_space[k][1]-design_space[k][0]) for k in design_space}
        P = pd.DataFrame([p_norm]*len(cond_df))
        X_eval = pd.concat([P.reset_index(drop=True), cond_df.reset_index(drop=True)], axis=1)
        X_eval_t = torch.tensor(X_eval.values, dtype=torch.float32, device=device)
        with torch.no_grad():
            Y_pred = model(X_eval_t).cpu().numpy()
        # Desnormaliza previsões
        pred = {}
        for j, name in enumerate([c for c in target_cols if c+'_norm' in df_all.columns]):
            mn, mx = norm_stats[name]['min'], norm_stats[name]['max']
            pred[name] = Y_pred[:, j]*(mx - mn) + mn
        score = objective_scalar(pred)
        results.append({'params': p, 'score': score})
    results = sorted(results, key=lambda d: d['score'])
    return results

if X is not None:
    ranked = evaluate_designs(model, n_candidates=200)  # aumente para 1000+ conforme compute
    ranked[:5]



## 9) Exportar o melhor resultado e métricas


In [None]:
outdir = 'runs_generic'
os.makedirs(outdir, exist_ok=True)
torch.save(model.state_dict(), os.path.join(outdir, 'surrogate_best.pt'))
with open(os.path.join(outdir, 'top_designs.json'), 'w') as f:
    json.dump(ranked[:20], f, indent=2)
print('Arquivos salvos em:', outdir)



## 10) Visualizações rápidas

> Observação: ao fazer gráficos, usamos **matplotlib** e **um único gráfico por figura**, sem estilos/cores específicas, conforme diretrizes deste ambiente.


In [None]:
import matplotlib.pyplot as plt

def plot_curve(x, y, xlabel='condição', ylabel='coeficiente', title='Curva prevista'):
    plt.figure()
    plt.plot(x, y)
    plt.xlabel(xlabel); plt.ylabel(ylabel); plt.title(title)
    plt.show()

# Exemplo: plota Cd previsto do melhor design para as condições do dataset
if X is not None:
    best = ranked[0]['params']
    cond_df = df_all[[c for c in df_all.columns if c.endswith('_norm') and c.replace('_norm','') in feature_cols]]
    p_norm = {k+'_norm': (best[k]-design_space[k][0])/(design_space[k][1]-design_space[k][0]) for k in design_space}
    P = pd.DataFrame([p_norm]*len(cond_df))
    X_eval = pd.concat([P.reset_index(drop=True), cond_df.reset_index(drop=True)], axis=1)

    # Corrige reset_index param name for pandas versions


In [None]:
X_eval_t = torch.tensor(X_eval.values, dtype=torch.float32, device=device)
with torch.no_grad():
    Y_pred = model(X_eval_t).cpu().numpy()

if 'Cd' in target_cols and 'Cd_norm' in df_all.columns:
    x_real = df_all[feature_cols[0]].values  # assume 1D condição para plot simples
    j = [c for c in target_cols if c+'_norm' in df_all.columns].index('Cd')
    mn, mx = norm_stats['Cd']['min'], norm_stats['Cd']['max']
    cd_real = Y_pred[:, j]*(mx - mn) + mn
    plot_curve(x_real, cd_real, xlabel=feature_cols[0], ylabel='Cd', title='Cd (previsto)')



---

### Notas finais
- Este caderno é um **template**. O *termo físico* deve ser adaptado a um problema **seguro** (ex.: conservação de massa/energia em dutos, estabilidade de perfis civis, etc.).  
- Para **varredura de hiperparâmetros** do surrogate, crie laços externos variando `width`, `depth`, `lr` e pesos da *loss*.  
- Para rodadas grandes (1000+), use **checkpointing**, **seed fixo** e, se possível, aceleração por GPU/TPU.

**Foco sempre em aplicações civis/educacionais.**
