# Documento de avaliação do treinamento

## Pastas que contém os arquivos resultados do treinamento.
#### Pasta com os resultados apresentados e analisados
- Pasta com ultimo treinamento realizado: dados\processados\treino 5
    - Contém:
        - Melhor peso resultado do treinamento
        - Histórico do treino em csv
        - Plots resultados do treinamento
    

### Configuração e carregamento do modelo:

In [18]:
# Célula 1: Configuração e Imports
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from scipy.stats import norm

# Importa os módulos do seu projeto
from src.config import PATHS, DATA_CONFIG, MODEL_CONFIG, VIZ_CONFIG
from src.model import PINN_BlackScholes
from src.physics import black_scholes_residual

print("Imports e configurações carregados.")

Imports e configurações carregados.


In [19]:
# Célula 2: Função Auxiliar de Black-Scholes
def black_scholes_call_numpy(S, K, T, r, sigma):
    """Função analítica de Black-Scholes em NumPy para plotagem."""
    T = np.where(T <= 1e-8, 1e-8, T)
    sigma = np.where(sigma <= 1e-8, 1e-8, sigma)
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

def delta_call_numpy(S, K, T, r, sigma):
    """Função analítica do Delta de uma Call em NumPy."""
    T = np.where(T <= 1e-8, 1e-8, T)
    sigma = np.where(sigma <= 1e-8, 1e-8, sigma)
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    return norm.cdf(d1)

print("Funções auxiliares definidas.")



Funções auxiliares definidas.


In [None]:
# Célula 3: Carregar Modelo e Dados
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
stats_path = os.path.join(PATHS['model_save_dir'], 'data_stats.json')
try:
    with open(stats_path, 'r') as f:
        data_stats = json.load(f)
    print("📊 Estatísticas de normalização do treinamento original carregadas com sucesso.")
    print(data_stats)
except FileNotFoundError:
    print(f"ERRO: Arquivo de estatísticas '{stats_path}' não encontrado. Execute o main.py para gerá-lo.")

# Instanciar o modelo
model = PINN_BlackScholes(config=MODEL_CONFIG, data_stats=data_stats)

# Carregar os pesos treinados
weights_path = "dados/processados/treino 5/best_model_weights.pth"
if os.path.exists(weights_path):
    model.load_state_dict(torch.load(weights_path, map_location=device))
    model.to(device)
    model.eval()
    print(f"Modelo carregado com sucesso de '{weights_path}' e movido para '{device}'.")
else:
    print(f"ERRO: Arquivo de pesos não encontrado em '{weights_path}'.")

# Carregar o histórico de treinamento
history_path = "dados/processados/treino 5/training_history.csv"
if os.path.exists(history_path):
    history_df = pd.read_csv(history_path)
    print("Histórico de treinamento carregado.")
else:
    history_df = None
    print("Aviso: Arquivo de histórico de treinamento não encontrado.")

In [None]:
# Carregar estatísticas do dataset (necessário para o modelo)
df_opcoes=pd.read_csv("../PINN_Opcoes_BR/dados/brutos/Databricks/PETR4_2024.csv", sep=',')
df_opcoes['time_to_maturity'] = df_opcoes['days_to_maturity'] / 252.0  # Convertendo dias para anos
data_stats = {
    'S_min': df_opcoes['spot_price'].min(), 
    'S_max': df_opcoes['spot_price'].max(),
    'K_min': df_opcoes['strike'].min(), 
    'K_max': df_opcoes['strike'].max(),
    'T_max': df_opcoes['time_to_maturity'].max(),
}

# Célula 4: Carregar e Preparar os Novos Dados para Avaliação

arquivo_para_avaliar = 'PETR4_2024.csv' 


file_path = os.path.join(PATHS['raw_data'], arquivo_para_avaliar)
print(f"Carregando e processando dados de '{file_path}'...")

# Carrega e limpa os dados
df_teste = pd.read_csv(file_path)
df_teste = df_teste[df_teste['option_type'] == 'CALL'].dropna()
df_teste = df_teste[(df_teste['premium'] > 0) & (df_teste['days_to_maturity'] > 0) & (df_teste['volatility'] > 0)]
df_teste['time_to_maturity'] = df_teste['days_to_maturity'] / 252.0
df_teste['r'] =  pd.read_csv(PATHS['selic_data'])['Selic'].iloc[-1] / 100.0 

df_teste['S_norm'] = (df_teste['spot_price'] - data_stats['S_min']) / (data_stats['S_max'] - data_stats['S_min'])
df_teste['K_norm'] = (df_teste['strike'] - data_stats['K_min']) / (data_stats['K_max'] - data_stats['K_min'])
df_teste['T_norm'] = df_teste['time_to_maturity'] / data_stats['T_max']

# Prepara os tensores para o modelo
input_features = ['S_norm', 'K_norm', 'T_norm', 'r', 'premium']
X_teste = torch.tensor(df_teste[input_features].values, dtype=torch.float32).to(device)
y_teste = torch.tensor(df_teste[['premium']].values, dtype=torch.float32).to(device)

In [None]:
# Célula 5: Gerar Previsões e Analisar Resultados
print("Gerando previsões com o modelo carregado...")

with torch.no_grad():
    output_teste = model(X_teste)
    price_pred = output_teste['price']
    sigma_pred = output_teste['sigma']

# Converte para NumPy para facilitar a plotagem
price_real_np = y_teste.cpu().numpy().flatten()
price_pred_np = price_pred.cpu().numpy().flatten()
df_teste['price_pred_pinn'] = price_pred_np
df_teste['sigma_pred_pinn'] = sigma_pred.cpu().numpy().flatten()

In [None]:
# Criar val_loader a partir de df_opcoes 
import torch
import numpy as np
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import unicodedata, re

if 'df_opcoes' not in globals():
    print("df_opcoes não encontrado. Execute a célula que carrega o CSV antes.")
else:
    def norm_name(s):
        s2 = str(s).strip().lower()
        s2 = unicodedata.normalize('NFKD', s2).encode('ascii', 'ignore').decode('ascii')
        s2 = re.sub(r'[\s\.-]+', '_', s2)
        s2 = re.sub(r'[^0-9a-z_]', '', s2)
        return s2

    cols = list(df_opcoes.columns)
    norm_map = {c: norm_name(c) for c in cols}

    def find_column(candidates):
        for cand in candidates:
            for orig, norm in norm_map.items():
                if norm == cand:
                    return orig
        for cand in candidates:
            for orig, norm in norm_map.items():
                if cand in norm:
                    return orig
        return None

    spot_col = find_column(['spot_price', 'spot', 'price', 'preco', 'close', 'underlying_price'])
    strike_col = find_column(['strike', 'k', 'strike_price', 'preco_exercicio'])
    days_col = find_column(['days_to_maturity', 'days', 'dias', 'dias_para_vencimento'])
    ttm_col = find_column(['time_to_maturity', 'ttm', 'maturity', 'tenor'])
    premium_col = find_column(['premium', 'option_price', 'price', 'preco_opcao', 'last_price', 'preco'])

    print("Colunas detectadas:")
    print(f"  spot_col   -> {spot_col}")
    print(f"  strike_col -> {strike_col}")
    print(f"  days_col   -> {days_col}")
    print(f"  ttm_col    -> {ttm_col}")
    print(f"  premium_col-> {premium_col}")
    if not (spot_col and strike_col and (ttm_col or days_col)):
        print("Não foi possível identificar colunas essenciais (spot/strike/ttm). Liste as colunas disponíveis abaixo e ajuste manualmente:")
        print(cols)
    else:
        df = df_opcoes.copy()
        if ttm_col is None and days_col is not None:
            df['time_to_maturity'] = df[days_col].astype(float) / 252.0
            ttm_col_used = 'time_to_maturity'
        else:
            ttm_col_used = ttm_col if ttm_col is not None else 'time_to_maturity'

        # garantir numérico
        df[spot_col] = pd.to_numeric(df[spot_col], errors='coerce')
        df[strike_col] = pd.to_numeric(df[strike_col], errors='coerce')
        df[ttm_col_used] = pd.to_numeric(df[ttm_col_used], errors='coerce')

        # preencher data_stats caso não exista
        S_min = float(df[spot_col].min())
        S_max = float(df[spot_col].max())
        K_min = float(df[strike_col].min())
        K_max = float(df[strike_col].max())
        T_max = float(df[ttm_col_used].max())
        data_stats = {'S_min': S_min, 'S_max': S_max, 'K_min': K_min, 'K_max': K_max, 'T_max': T_max}
        print("data_stats calculado:", data_stats)

        # features
        S = df[spot_col].values
        K = df[strike_col].values
        T = df[ttm_col_used].values
        S_n = (S - S_min) / (S_max - S_min)
        K_n = (K - K_min) / (K_max - K_min)
        T_n = T / T_max
        r_fixed = 0.13
        # premium input (se existir usar, senão usar 1.0 placeholder)
        if premium_col and premium_col in df.columns:
            premium_in = pd.to_numeric(df[premium_col], errors='coerce').fillna(1.0).values
        else:
            premium_in = np.ones_like(S_n)

        # target (market premium). preferir premium_col se claramente preço da opção; caso contrário tentar 'option_price' ou 'premium'
        target = None
        for cand in ['option_price','premium','last_price','price','preco_opcao','preco']:
            col = find_column([cand])
            if col and col in df.columns:
                target = pd.to_numeric(df[col], errors='coerce').values
                break
        if target is None:
            # se não achou target, usar premium_in (mas avisa)
            target = premium_in
            print("Aviso: não foi encontrado target claro para preço de opção; usando coluna de premium/placeholder como target.")

        # remover linhas com NaN
        mask = (~np.isnan(S_n)) & (~np.isnan(K_n)) & (~np.isnan(T_n)) & (~np.isnan(target))
        X = np.vstack([S_n[mask], K_n[mask], T_n[mask], np.full(mask.sum(), r_fixed), premium_in[mask]]).T
        y = target[mask].astype(np.float32)

        X_tensor = torch.tensor(X, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.float32).reshape(-1,1)

        # split
        X_train, X_val, y_train, y_val = train_test_split(X_tensor.numpy(), y_tensor.numpy(), test_size=0.2, random_state=42)
        val_ds = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32))
        batch_size = 1024
        val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
        print(f"val_loader criado com {len(val_ds)} amostras (batch_size={batch_size}).")
# CÉLULA: Criar val_loader a partir de df_opcoes (executar)
import torch
import numpy as np
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import unicodedata, re

if 'df_opcoes' not in globals():
    print("df_opcoes não encontrado. Execute a célula que carrega o CSV antes.")
else:
    def norm_name(s):
        s2 = str(s).strip().lower()
        s2 = unicodedata.normalize('NFKD', s2).encode('ascii', 'ignore').decode('ascii')
        s2 = re.sub(r'[\s\.-]+', '_', s2)
        s2 = re.sub(r'[^0-9a-z_]', '', s2)
        return s2

    cols = list(df_opcoes.columns)
    norm_map = {c: norm_name(c) for c in cols}

    def find_column(candidates):
        for cand in candidates:
            for orig, norm in norm_map.items():
                if norm == cand:
                    return orig
        for cand in candidates:
            for orig, norm in norm_map.items():
                if cand in norm:
                    return orig
        return None

    spot_col = find_column(['spot_price', 'spot', 'price', 'preco', 'close', 'underlying_price'])
    strike_col = find_column(['strike', 'k', 'strike_price', 'preco_exercicio'])
    days_col = find_column(['days_to_maturity', 'days', 'dias', 'dias_para_vencimento'])
    ttm_col = find_column(['time_to_maturity', 'ttm', 'maturity', 'tenor'])
    premium_col = find_column(['premium', 'option_price', 'price', 'preco_opcao', 'last_price', 'preco'])

    print("Colunas detectadas:")
    print(f"  spot_col   -> {spot_col}")
    print(f"  strike_col -> {strike_col}")
    print(f"  days_col   -> {days_col}")
    print(f"  ttm_col    -> {ttm_col}")
    print(f"  premium_col-> {premium_col}")
    if not (spot_col and strike_col and (ttm_col or days_col)):
        print("Não foi possível identificar colunas essenciais (spot/strike/ttm). Liste as colunas disponíveis abaixo e ajuste manualmente:")
        print(cols)
    else:
        df = df_opcoes.copy()
        if ttm_col is None and days_col is not None:
            df['time_to_maturity'] = df[days_col].astype(float) / 252.0
            ttm_col_used = 'time_to_maturity'
        else:
            ttm_col_used = ttm_col if ttm_col is not None else 'time_to_maturity'

        # garantir numérico
        df[spot_col] = pd.to_numeric(df[spot_col], errors='coerce')
        df[strike_col] = pd.to_numeric(df[strike_col], errors='coerce')
        df[ttm_col_used] = pd.to_numeric(df[ttm_col_used], errors='coerce')

        # preencher data_stats caso não exista
        S_min = float(df[spot_col].min())
        S_max = float(df[spot_col].max())
        K_min = float(df[strike_col].min())
        K_max = float(df[strike_col].max())
        T_max = float(df[ttm_col_used].max())
        data_stats = {'S_min': S_min, 'S_max': S_max, 'K_min': K_min, 'K_max': K_max, 'T_max': T_max}
        print("data_stats calculado:", data_stats)

        # features
        S = df[spot_col].values
        K = df[strike_col].values
        T = df[ttm_col_used].values
        S_n = (S - S_min) / (S_max - S_min)
        K_n = (K - K_min) / (K_max - K_min)
        T_n = T / T_max
        r_fixed = 0.13
        # premium input (se existir usar, senão usar 1.0 placeholder)
        if premium_col and premium_col in df.columns:
            premium_in = pd.to_numeric(df[premium_col], errors='coerce').fillna(1.0).values
        else:
            premium_in = np.ones_like(S_n)

        # target (market premium). preferir premium_col se claramente preço da opção; caso contrário tentar 'option_price' ou 'premium'
        target = None
        for cand in ['option_price','premium','last_price','price','preco_opcao','preco']:
            col = find_column([cand])
            if col and col in df.columns:
                target = pd.to_numeric(df[col], errors='coerce').values
                break
        if target is None:
            # se não achou target, usar premium_in (mas avisa)
            target = premium_in
            print("Aviso: não foi encontrado target claro para preço de opção; usando coluna de premium/placeholder como target.")

        # remover linhas com NaN
        mask = (~np.isnan(S_n)) & (~np.isnan(K_n)) & (~np.isnan(T_n)) & (~np.isnan(target))
        X = np.vstack([S_n[mask], K_n[mask], T_n[mask], np.full(mask.sum(), r_fixed), premium_in[mask]]).T
        y = target[mask].astype(np.float32)

        X_tensor = torch.tensor(X, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.float32).reshape(-1,1)

        # split
        X_train, X_val, y_train, y_val = train_test_split(X_tensor.numpy(), y_tensor.numpy(), test_size=0.2, random_state=42)
        val_ds = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32))
        batch_size = 1024
        val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
        print(f"val_loader criado com {len(val_ds)} amostras (batch_size={batch_size}).")

## Gráficos de para análise:

Preparo da grade para superfícies

In [None]:
# Preparar grade para superfícies (executa ao rodar)
import torch
import numpy as np

resolution = 50  # ajuste conforme necessário
print(f"Preparando grade com resolução = {resolution}...")
moneyness_vals = torch.linspace(0.8, 1.2, resolution, device=device)
T_norm_vals = torch.linspace(0.01, 1.0, resolution, device=device)
M_grid, T_grid = torch.meshgrid(moneyness_vals, T_norm_vals, indexing='ij')

K_fixed = (data_stats['K_max'] + data_stats['K_min']) / 2
S_vals = M_grid * K_fixed

S_norm_grid = (S_vals - data_stats['S_min']) / (data_stats['S_max'] - data_stats['S_min'])
K_norm_fixed = (K_fixed - data_stats['K_min']) / (data_stats['K_max'] - data_stats['K_min'])
K_norm_grid = torch.full_like(S_norm_grid, K_norm_fixed)

r_grid = torch.full_like(S_norm_grid, 0.13)
premium_grid = torch.full_like(S_norm_grid, 1.0)

model_input = torch.stack([
    S_norm_grid.flatten(), K_norm_grid.flatten(), T_grid.flatten(),
    r_grid.flatten(), premium_grid.flatten()
], dim=1).to(device)
print("Grade e model_input prontos.")

### Análise do Loss durante o treinamento

In [None]:
#Plot do histórico de perdas (executa ao rodar)
import os
import matplotlib.pyplot as plt

if 'history_df' not in globals() or history_df is None:
    print("Histórico não disponível (history_df is None). Pule esta célula após carregar o histórico.")
else:
    print("Gerando gráfico do histórico de perdas...")
    fig, ax1 = plt.subplots(figsize=(12, 7))

    ax1.plot(history_df['train_loss'], label='Perda de Treinamento', color='blue')
    ax1.plot(history_df['val_loss'], label='Perda de Validação', color='orange')
    ax1.set_xlabel('Época')
    ax1.set_ylabel('Loss (MSE)', color='blue')
    ax1.set_yscale('log')
    ax1.tick_params(axis='y', labelcolor='blue')
    ax1.grid(True, which="both", ls="--", axis='y')

    ax2 = ax1.twinx()
    if 'lr' in history_df.columns:
        ax2.plot(history_df['lr'], label='Taxa de Aprendizado (LR)', color='green', linestyle='--')
    ax2.set_ylabel('Taxa de Aprendizado', color='green')
    ax2.set_yscale('log')
    ax2.tick_params(axis='y', labelcolor='green')

    fig.suptitle('Histórico de Perdas e Taxa de Aprendizado')
    lines, labels = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    fig.legend(lines + lines2, labels + labels2, loc="upper right", bbox_to_anchor=(1,1), bbox_transform=ax1.transAxes)

    os.makedirs(PATHS['plot_save_dir'], exist_ok=True)
    save_path = os.path.join(PATHS['plot_save_dir'], 'loss_history.png')
    fig.savefig(save_path, bbox_inches='tight')
    plt.show()
    plt.close(fig)
    print(f"Gráfico de perdas salvo em: {save_path}")

In [None]:
# Mapa de calor do resíduo da PDE (processado em batches com grad)
import torch
import numpy as np
import matplotlib.pyplot as plt

batch_size = 1024
N = model_input.shape[0]
residual_all = np.zeros(N, dtype=float)

model.train()  # precisamos de grad, mas usaremos create_graph dentro da função
for start in range(0, N, batch_size):
    end = min(start + batch_size, N)
    inp = model_input[start:end].clone().detach().requires_grad_(True)
    out = model(inp)
    # black_scholes_residual espera (output_phy, inputs, data_stats) — ajustado ao seu módulo
    res = black_scholes_residual(out, inp, data_stats)
    residual_all[start:end] = torch.abs(res).cpu().detach().numpy().flatten()
print("Cálculo do resíduo concluído.")

residual_grid = residual_all.reshape(M_grid.shape)

fig, ax = plt.subplots(figsize=(12, 8))
M_cpu = M_grid.cpu().numpy()
T_cpu = T_grid.cpu().numpy() * data_stats['T_max'] * 252
c = ax.pcolormesh(M_cpu, T_cpu, residual_grid, cmap='hot', shading='gouraud', vmax=np.percentile(residual_grid, 99))
ax.set_title('Mapa de Calor do Resíduo Absoluto da EDP de Black-Scholes')
ax.set_xlabel('Moneyness (S/K)')
ax.set_ylabel('Dias para o Vencimento')
fig.colorbar(c, ax=ax, label='|Resíduo da EDP|')

os.makedirs(PATHS['plot_save_dir'], exist_ok=True)
save_path = os.path.join(PATHS['plot_save_dir'], 'pde_residual_surface.png')
fig.savefig(save_path, bbox_inches='tight')
plt.show()
plt.close(fig)
print(f"Gráfico do resíduo da EDP salvo em: {save_path}")

model.eval()

In [None]:
# Evolução dos Pesos da Perda

fig, ax1 = plt.subplots(figsize=(12, 7))

# Eixo esquerdo para o peso dos dados
ax1.plot(history_df['weight_data'], label='Peso da Perda de Dados (λ_data)', color='deepskyblue')
ax1.set_xlabel('Época')
ax1.set_ylabel('Peso da Perda de Dados', color='deepskyblue')
ax1.set_yscale('log')
ax1.tick_params(axis='y', labelcolor='deepskyblue')
ax1.grid(True, which="both", ls="--", axis='y')        
# Eixo direito para o peso da física (PDE)
ax2 = ax1.twinx()
ax2.plot(history_df['weight_pde'], label='Peso da Perda da Física (λ_pde)', color='crimson', linestyle='--')
ax2.set_ylabel('Peso da Perda da Física', color='crimson')
ax2.set_yscale('log')
ax2.tick_params(axis='y', labelcolor='crimson')
fig.suptitle('Evolução dos Pesos da Perda (Ponderação Adaptativa)')
# Coleta os handles e labels de ambos os eixos para uma legenda unificada
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc='upper center')
plt.show()

### Análise do Delta e das Gregas

In [None]:
# Acurácia das Gregas (Delta)
moneyness_vals = torch.linspace(0.8, 1.2, 100, device=device)
T_norm_vals = torch.tensor([0.1, 0.5, 0.9], device=device) # Vencimentos curto, médio e longo

K_fixed = (data_stats['K_max'] + data_stats['K_min']) / 2
K_norm_fixed = (K_fixed - data_stats['K_min']) / (data_stats['K_max'] - data_stats['K_min'])
r_fixed = 0.13

plt.figure(figsize=(12, 7))

for T_n in T_norm_vals:
    S_vals = moneyness_vals * K_fixed
    S_norm_vals = (S_vals - data_stats['S_min']) / (data_stats['S_max'] - data_stats['S_min'])
    
    inputs = torch.stack([
        S_norm_vals,
        torch.full_like(S_norm_vals, K_norm_fixed),
        torch.full_like(S_norm_vals, T_n.item()),
        torch.full_like(S_norm_vals, r_fixed),
        torch.ones_like(S_norm_vals) # Premium placeholder
    ], dim=1).requires_grad_(True)
    
    output = model(inputs)
    V = output['price']
    sigma_pinn = output['sigma']
    
    # Delta da PINN (via Autograd)
    V_grads = torch.autograd.grad(V.sum(), inputs, create_graph=True)[0]
    V_S_norm = V_grads[:, 0]
    delta_pinn = V_S_norm / (data_stats['S_max'] - data_stats['S_min'])
    
    # Delta Analítico (usando a vol da PINN)
    T_val = (T_n * data_stats['T_max']).item()
    delta_bs = delta_call_numpy(S_vals.cpu().detach().numpy(), K_fixed, T_val, r_fixed, sigma_pinn.cpu().detach().numpy().flatten())
    
    plt.plot(moneyness_vals.cpu().numpy(), delta_pinn.cpu().detach().numpy(), label=f'Delta PINN (T={T_val*252:.0f} dias)')
    plt.plot(moneyness_vals.cpu().numpy(), delta_bs, '--', label=f'Delta Analítico (T={T_val*252:.0f} dias)')

plt.title("Comparação do Delta (PINN vs. Analítico)")
plt.xlabel("Moneyness (S/K)")
plt.ylabel("Delta (∂V/∂S)")
plt.legend()
plt.grid(True)
plt.show()
save_path = os.path.join(PATHS['plot_save_dir'], 'delta_accuracy.png')
fig.savefig(save_path)
print(f"Gráfico de acurácia do Delta salvo em: {save_path}")

In [None]:
# Superfície do Delta Aprendido pela PINN

def prepare_surface_data(resolution=50): # Resolução menor para testes mais rápidos
    moneyness_vals = torch.linspace(0.8, 1.2, resolution, device=device)
    T_norm_vals = torch.linspace(0.01, 1, resolution, device=device)        
    M_grid, T_grid = torch.meshgrid(moneyness_vals, T_norm_vals, indexing='ij')        
    K_fixed = (data_stats['K_max'] + data_stats['K_min']) / 2
    S_vals = M_grid * K_fixed
    S_norm_grid = (S_vals - data_stats['S_min']) / (data_stats['S_max'] - data_stats['S_min'])
    K_norm_fixed = (K_fixed - data_stats['K_min']) / (data_stats['K_max'] - data_stats['K_min'])
    K_norm_grid = torch.full_like(S_norm_grid, K_norm_fixed)
    r_grid = torch.full_like(S_norm_grid, 0.13)
    premium_grid = torch.full_like(S_norm_grid, 1.0)
    model_input = torch.stack([
        S_norm_grid.flatten(), K_norm_grid.flatten(), T_grid.flatten(), 
        r_grid.flatten(), premium_grid.flatten()
    ], dim=1)        
    return M_grid, T_grid, model_input

M_grid, T_grid, model_input = prepare_surface_data()
model_input.requires_grad = True
output = model(model_input)
V = output['price']
V_grads = torch.autograd.grad(V, model_input, grad_outputs=torch.ones_like(V), create_graph=True)[0]
V_S_norm = V_grads[:, 0]        
delta = V_S_norm / (data_stats['S_max'] - data_stats['S_min'])
delta_surface = delta.reshape_as(M_grid)
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
M_cpu = M_grid.cpu().detach().numpy()
T_cpu = T_grid.cpu().detach().numpy() * data_stats['T_max'] * 252
delta_cpu = delta_surface.cpu().detach().numpy()        
surf = ax.plot_surface(M_cpu, T_cpu, delta_cpu, cmap='cividis', edgecolor='none')
ax.set_title('Superfície do Delta Aprendido pela PINN')
ax.set_xlabel('Moneyness (S/K)')
ax.set_ylabel('Dias para o Vencimento')
ax.set_zlabel('Delta (∂V/∂S)')
ax.set_zlim(0, 1)
fig.colorbar(surf, shrink=0.5, aspect=5)
plt.show()

### Análise da superfície da predição do PINN

In [None]:
# Comparação de superfícies de preço (PINN vs. Analítico)
resolution=40
s_vals = np.linspace(data_stats['S_min'], data_stats['S_max'], resolution)
t_vals = np.linspace(0.01, data_stats['T_max'], resolution)
S_grid, T_grid = np.meshgrid(s_vals, t_vals)
K_fixed = (data_stats['K_max'] + data_stats['K_min']) / 2
r_fixed = 0.13
C_pinn = np.zeros_like(S_grid)
sigma_pinn_grid = np.zeros_like(S_grid)
with torch.no_grad():
    for i in range(resolution):
        for j in range(resolution):
            S, T = S_grid[i, j], T_grid[i, j]
            S_n = (S - data_stats['S_min']) / (data_stats['S_max'] - data_stats['S_min'])
            T_n = T / data_stats['T_max']
            K_n = (K_fixed - data_stats['K_min']) / (data_stats['K_max'] - data_stats['K_min'])
            inp = torch.tensor([[S_n, K_n, T_n, r_fixed, 1.0]], dtype=torch.float32).to(device)
            output = model(inp)
            C_pinn[i, j] = output['price'].item()
            sigma_pinn_grid[i, j] = output['sigma'].item()
C_bs = black_scholes_call_numpy(S_grid, K_fixed, T_grid, r_fixed, np.mean(sigma_pinn_grid))
fig = plt.figure(figsize=(16, 7))
ax1 = fig.add_subplot(121, projection='3d')
ax2 = fig.add_subplot(122, projection='3d')
ax1.plot_surface(S_grid, T_grid * 252, C_pinn, cmap='viridis')
ax1.set_title("Superfície de Preço da PINN")
ax1.set_xlabel('Preço do Ativo (S)'); ax1.set_ylabel('Dias para Vencimento'); ax1.set_zlabel('Preço da Opção (V)')
ax2.plot_surface(S_grid, T_grid * 252, C_bs, cmap='plasma')
ax2.set_title("Superfície Analítica de Black-Scholes (Vol Média da PINN)")
ax2.set_xlabel('Preço do Ativo (S)'); ax2.set_ylabel('Dias para Vencimento'); ax2.set_zlabel('Preço da Opção (V)')
plt.tight_layout()
plt.show()


In [None]:
# Previsão vs Real (scatter) - usa val_loader do notebook
import numpy as np
import matplotlib.pyplot as plt

if 'val_loader' not in globals():
    print("val_loader não encontrado. Crie/defina val_loader antes de rodar esta célula.")
else:
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for inputs, premiums_real in val_loader:
            inputs = inputs.to(device)
            price_pred = model(inputs)['price'].cpu().numpy().flatten()
            actuals.extend(premiums_real.cpu().numpy().flatten())
            predictions.extend(price_pred)
    actuals = np.array(actuals)
    predictions = np.array(predictions)

    fig = plt.figure(figsize=(10, 10))
    plt.scatter(actuals, predictions, alpha=0.3, label='Previsões do Modelo')
    min_val = min(actuals.min(), predictions.min())
    max_val = max(actuals.max(), predictions.max())
    plt.plot([min_val, max_val], [min_val, max_val], 'r--', label='Linha de Perfeição (y=x)')
    plt.xlabel('Preço de Mercado Real (Prêmio)')
    plt.ylabel('Preço Previsto pela PINN')
    plt.title('Comparação entre Preço Real e Previsto (Conjunto de Validação)')
    plt.legend()
    plt.grid(True)
    plt.axis('equal')

    os.makedirs(PATHS['plot_save_dir'], exist_ok=True)
    save_path = os.path.join(PATHS['plot_save_dir'], 'prediction_vs_actual.png')
    fig.savefig(save_path, bbox_inches='tight')
    plt.show()
    plt.close(fig)
    print(f"Gráfico de comparação de preços salvo em: {save_path}")

In [None]:
# Superfície de Erro de Precificação
resolution = 50
s_vals = np.linspace(data_stats['S_min'], data_stats['S_max'], resolution)
t_vals = np.linspace(0.01, data_stats['T_max'], resolution)
S_grid, T_grid = np.meshgrid(s_vals, t_vals)
K_fixed = (data_stats['K_max'] + data_stats['K_min']) / 2
r_fixed = 0.13

C_pinn = np.zeros_like(S_grid)
sigma_pinn_grid = np.zeros_like(S_grid)

with torch.no_grad():
    for i in range(resolution):
        for j in range(resolution):
            S, T = S_grid[i, j], T_grid[i, j]
            S_n = (S - data_stats['S_min']) / (data_stats['S_max'] - data_stats['S_min'])
            T_n = T / data_stats['T_max']
            K_n = (K_fixed - data_stats['K_min']) / (data_stats['K_max'] - data_stats['K_min'])
            inp = torch.tensor([[S_n, K_n, T_n, r_fixed, 1.0]], dtype=torch.float32).to(device)
            output = model(inp)
            C_pinn[i, j] = output['price'].item()
            sigma_pinn_grid[i, j] = output['sigma'].item()

C_bs = black_scholes_call_numpy(S_grid, K_fixed, T_grid, r_fixed, np.mean(sigma_pinn_grid))
Price_Error_Surface = np.abs(C_pinn - C_bs)

fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
surf = ax.plot_surface(S_grid, T_grid * 252, Price_Error_Surface, cmap='hot')
ax.set_title("Superfície de Erro Absoluto de Preço (PINN vs. BS Analítico)")
ax.set_xlabel('Preço do Ativo (S)')
ax.set_ylabel('Dias para Vencimento')
ax.set_zlabel('Erro Absoluto de Preço')
fig.colorbar(surf, shrink=0.5, aspect=5)
plt.show()
save_path = os.path.join(PATHS['plot_save_dir'], 'price_error_surface.png')
fig.savefig(save_path)
print(f"Gráfico da superfície de erro de preço salvo em: {save_path}")

### Análise da superfície da volatilidade

In [None]:
# Superfície de Volatilidade Implícita Aprendida (PINN)
M_grid, T_grid, model_input = prepare_surface_data()
with torch.no_grad():
    sigma_pred = model(model_input)['sigma'].reshape_as(M_grid) * 100
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')        
M_cpu = M_grid.cpu().numpy()
T_cpu = T_grid.cpu().numpy() * data_stats['T_max'] * 252
sigma_cpu = sigma_pred.cpu().numpy()
surf = ax.plot_surface(M_cpu, T_cpu, sigma_cpu, cmap='viridis', edgecolor='none')        
ax.set_title('Superfície de Volatilidade Implícita Aprendida (PINN)')
ax.set_xlabel('Moneyness (S/K)')
ax.set_ylabel('Dias para o Vencimento')
ax.set_zlabel('Volatilidade Implícita (%)')
fig.colorbar(surf, shrink=0.5, aspect=5)
plt.show()

In [None]:
# Curva de Volatilidade Implícita Aprendida

model.eval()
vol_real_list = []
vol_pred_list = []
with torch.no_grad():
    for inputs, _ in val_loader:
        # A volatilidade real (histórica) não é usada no input, mas a temos no dataset
        # Vamos reconstruir o input sem a vol para o modelo, mas guardar a vol para comparação
        # A 5ª coluna (índice 4) é o prêmio, a vol não está aqui. Precisamos carregar os dados completos.
        pass # Esta lógica precisa ser ajustada para ter acesso à volatilidade original
# SIMPLIFICAÇÃO: Para este exemplo, vamos plotar a vol prevista em relação ao moneyness
M_grid, _, model_input = prepare_surface_data(resolution=100)
with torch.no_grad():
    sigma_pred = model(model_input)['sigma'].reshape_as(M_grid) * 100        
plt.figure(figsize=(10, 7))
# Plotamos um "corte" da superfície de volatilidade para um vencimento médio
T_median_idx = sigma_pred.shape[1] // 2
plt.plot(M_grid[:, T_median_idx].cpu().numpy(), sigma_pred[:, T_median_idx].cpu().numpy(), label=f'Corte da Vol. Implícita (PINN)')        
plt.xlabel('Moneyness (S/K)')
plt.ylabel('Volatilidade Implícita (%)')
plt.title('Curva de Volatilidade Implícita Aprendida (Vencimento Fixo)')
plt.legend()
plt.grid(True)

### Análise em relação a moneyness

In [None]:
# CÉLULA: Erro por moneyness (usa val_loader)
import numpy as np
import matplotlib.pyplot as plt

if 'val_loader' not in globals():
    print("val_loader não encontrado. Crie/defina val_loader antes de rodar esta célula.")
else:
    model.eval()
    moneyness_vals = []
    errors = []
    with torch.no_grad():
        for inputs, premiums_real in val_loader:
            S_norm = inputs[:, 0].cpu()
            K_norm = inputs[:, 1].cpu()
            S = S_norm * (data_stats['S_max'] - data_stats['S_min']) + data_stats['S_min']
            K = K_norm * (data_stats['K_max'] - data_stats['K_min']) + data_stats['K_min']
            moneyness = (S / K).numpy()

            inputs = inputs.to(device)
            price_pred = model(inputs)['price'].cpu().numpy().flatten()
            premiums = premiums_real.cpu().numpy().flatten()

            valid_indices = premiums > 1e-6
            if valid_indices.sum() == 0:
                continue
            error_pct = (price_pred[valid_indices] - premiums[valid_indices]) / premiums[valid_indices]

            moneyness_vals.extend(moneyness[valid_indices])
            errors.extend(error_pct)

    moneyness_vals = np.array(moneyness_vals)
    errors = np.array(errors)

    fig = plt.figure(figsize=(12, 7))
    plt.scatter(moneyness_vals, errors, alpha=0.1)
    plt.axhline(0, color='red', linestyle='--')
    plt.xlabel('Moneyness (S/K)')
    plt.ylabel('Erro Percentual de Previsão ((Prev - Real) / Real)')
    plt.title('Distribuição do Erro de Previsão por Moneyness')
    plt.grid(True)

    os.makedirs(PATHS['plot_save_dir'], exist_ok=True)
    save_path = os.path.join(PATHS['plot_save_dir'], 'error_by_moneyness.png')
    fig.savefig(save_path, bbox_inches='tight')
    plt.show()
    plt.close(fig)
    print(f"Gráfico de erro por moneyness salvo em: {save_path}")