# Definindo sistema


In [1]:
import numpy as np
import torch
from scipy.integrate import solve_ivp

from plots import plot_tanks

np.random.seed(42)

# Constantes
a1 = 50  # cm^2
a2 = 50  # cm^2
cv1 = 1
cv2 = 1


def F(t, op=np):
    f = 5 - (t**2) / 3000
    return op.where(f < 0.0, 0.0, f)


def edo_np(t, Y):
    h1, h2 = Y[0], Y[1]

    # Tanks limits
    h1 = np.where(h1 < 0.0, 0.0, h1)
    h2 = np.where(h2 < 0.0, 0.0, h2)

    # Equations
    dh1dt = (F(t) - cv1 * np.sqrt(h1)) / a1
    dh2dt = (cv1 * np.sqrt(h1) - cv2 * np.sqrt(h2)) / a2
    return np.array([dh1dt, dh2dt])


t = np.linspace(0, 300, 300)
t_tensor = torch.tensor(t, dtype=torch.float32, requires_grad=True).unsqueeze(1)
y0 = np.array([0, 0])

sol = solve_ivp(edo_np, [t[0], t[-1]], y0, t_eval=t)
h1_exp = sol.y[0] + np.random.normal(0, 1, len(t)) * 0.1
h2_exp = sol.y[1] + np.random.normal(0, 1, len(t)) * 0.1

plot_tanks(
    t, [h1_exp, h2_exp], ["h1 (exp)", "h2 (exp)"], scatter=2, filename="exp_tanks"
)

# plt.figure(figsize=(10, 4), layout="constrained")
# plt.plot(t, F(t), label="F")
# plt.show()


# Definindo rede neural


In [2]:
from torch import nn


class BaseModel(nn.Module):
    def __init__(self):
        super().__init__()
        input_size = 1  # t
        hidden_size = 8
        output_size = 2  # h1, h2

        self.hidden_layer = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, output_size),
        )

    def forward(self, x):
        output = self.hidden_layer((x * 2 / torch.max(t_tensor)) - 1)
        return torch.abs((output + 1) * (torch.max(h1_exp) / 2))


## Definindo função Loss


In [3]:
from utils import dydx, mean_square

h1_exp = torch.tensor(h1_exp, dtype=torch.float32)
h2_exp = torch.tensor(h2_exp, dtype=torch.float32)


def edo_torch(t, Y):
    h1, h2 = Y

    # Equations
    dh1dt = (F(t, torch) - cv1 * torch.sqrt(h1)) / a1
    dh2dt = (cv1 * torch.sqrt(h1) - cv2 * torch.sqrt(h2)) / a2
    return [dh1dt, dh2dt]


def loss_fn(model, t):
    # Loss das EDOs
    Y_pred = model(t)
    h1_pred, h2_pred = Y_pred[:, 0], Y_pred[:, 1]

    dh1dt_pinn, dh2dt_pinn = dydx(t, h1_pred), dydx(t, h2_pred)
    dh1dt_edo, dh2dt_edo = edo_torch(t, [h1_pred, h2_pred])

    loss_EDO1 = mean_square(dh1dt_pinn - dh1dt_edo)
    loss_EDO2 = mean_square(dh2dt_pinn - dh2dt_edo)

    # Loss das condições iniciais
    t0 = torch.tensor([[0.0]], requires_grad=True)
    Y0 = model(t0)
    h1_0, h2_0 = Y0[:, 0], Y0[:, 1]

    loss_ic1 = mean_square(h1_0 - y0[0])
    loss_ic2 = mean_square(h2_0 - y0[1])

    # Loss dos dados
    loss_data_h1 = mean_square(h1_pred - h1_exp)
    loss_data_h2 = mean_square(h2_pred - h2_exp)

    # Loss total
    loss_total = (
        loss_EDO1 + loss_EDO2 + loss_data_h1 + loss_data_h2 + loss_ic1 + loss_ic2
    )

    return loss_total


# Testando métodos


In [4]:
n_execuções = 2
target_loss = 0.1

Adam_study_path = "../results/Adam-studies.hkl"
Adam_results_path = "../results/Adam-speeds.hkl"
Adam_model_path = "../results/Adam-model.pt"

GA_study_path = "../results/GA-studies.hkl"
GA_results_path = "../results/GA-speeds.hkl"
GA_model_path = "../results/GA-model.pt"


def count_fails(losses):
    return np.sum(np.array(losses) > target_loss)


## Adam


### Otimizando hiperparametros


In [5]:
import os

import hickle as hkl
import optuna


def objective(trial: optuna.Trial):
    torch.manual_seed(42)
    test_model = BaseModel()

    lr = trial.suggest_float("lr", 1e-15, 1)
    beta1 = trial.suggest_float("beta1", 1e-10, 1)
    beta2 = trial.suggest_float("beta2", 1e-10, 1)

    optimizer = torch.optim.Adam(test_model.parameters(), lr=lr, betas=(beta1, beta2))

    # O Loop de treinamento
    for _ in range(1000):
        # Coloca o modelo no modo de treinamento
        test_model.train()

        # Calcula o loss usando a nossa função loss.
        loss = loss_fn(test_model, t_tensor)

        # Ajusta os valores do modelo
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return float(loss_fn(test_model, t_tensor).detach().numpy())


study = optuna.create_study(direction="minimize", study_name="Adam-study")

if not os.path.exists(Adam_study_path):
    print("Otimizando hiperparametros do Adam")
    study.optimize(objective, n_trials=100)
    print("Salvando estudo para futuras execuções")
    hkl.dump(study, Adam_study_path)
else:
    print("Recuperando estudo dos hiperparametros do Adam.")
    study = hkl.load(Adam_study_path)

best_Adam = study.best_trial
print("Melhor Adam:")
print("  Valor do Loss:", best_Adam.value)

print("  hiperparametros:")
for key, value in best_Adam.params.items():
    print(f"    {key}: {value}")


[I 2025-01-12 20:23:57,169] A new study created in memory with name: Adam-study


Recuperando estudo dos hiperparametros do Adam.
Melhor Adam:
  Valor do Loss: 0.03868981450796127
  hiperparametros:
    lr: 0.003703217091831923
    beta1: 0.8559747074607573
    beta2: 0.39993555192502994


### Medindo desempenho


In [6]:
import time

Adam_times = []
Adam_losses = []
Adam_model = BaseModel()

if not (os.path.exists(Adam_results_path) and os.path.exists(Adam_model_path)):
    print("Fazendo testes de desempenho do Adam")
    for i in range(n_execuções):
        start_time = time.monotonic()
        torch.manual_seed(i)
        Adam_model = BaseModel()
        optimizer = torch.optim.Adam(
            Adam_model.parameters(),
            lr=best_Adam.params["lr"],
            betas=(best_Adam.params["beta1"], best_Adam.params["beta2"]),
        )

        loss_value = 0
        for epoch in range(5000):
            # Coloca o modelo no modo de treinamento
            Adam_model.train()

            # Calcula o loss usando a nossa função loss.
            loss = loss_fn(Adam_model, t_tensor)

            # Ajusta os valores do modelo
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss_value = float(loss.detach().numpy())
            if loss_value < target_loss:
                break

        elapsed_time = time.monotonic() - start_time
        Adam_times.append(elapsed_time)
        Adam_losses.append(loss_value)
    print("Salvando para futuras execuções")
    hkl.dump((Adam_times, Adam_losses), Adam_results_path)
    Adam_model.eval()
    torch.save(Adam_model.state_dict(), Adam_model_path)
else:
    print("Recuperando testes anteriores")
    Adam_times, Adam_losses = hkl.load(Adam_results_path)
    Adam_model.load_state_dict(torch.load(Adam_model_path, weights_only=True))

print(f"Média do tempo (Adam): {np.mean(Adam_times):.3f}s")
print("Tentativas falhadas:", count_fails(Adam_losses))


Recuperando testes anteriores
Média do tempo (Adam): 2.116s
Tentativas falhadas: 0


In [7]:
@torch.inference_mode()
def test_model(model):
    y = model(t_tensor)
    return [y[:, 0], y[:, 1]]


pinn_h1, pinn_h2 = test_model(Adam_model)

# Gráfico
plot_tanks(
    t,
    (h1_exp, h2_exp, pinn_h1, pinn_h2),
    ["h1 (exp)", "h2 (exp)", "h1 (PINN Adam)", "h2 (PINN Adam)"],
    scatter=2,
    filename="adam_tanks",
)


## Algoritmo Genético


In [8]:
import pygad
from pygad.torchga import torchga

GA_model = BaseModel()


def fitness_func(ga_instance, solution, solution_idx):
    model_weights_dict = torchga.model_weights_as_dict(
        model=GA_model, weights_vector=solution
    )
    GA_model.load_state_dict(model_weights_dict)

    GA_model.eval()
    loss = loss_fn(GA_model, t_tensor)

    # Quanto menor o loss, maior o fitness
    return -loss.item()


### Otimizando hiperparametros


In [9]:
def objective(trial: optuna.Trial):
    torch.manual_seed(42)
    test_model = BaseModel()

    parent_selection_type = trial.suggest_categorical(
        "parent_selection_type", ["sss", "rws", "sus", "rank", "random", "tournament"]
    )
    keep_elitism = trial.suggest_int("keep_elitism", 0, 10)
    num_parents_mating = trial.suggest_int("num_parents_mating", 2, 10)

    crossover_type = trial.suggest_categorical(
        "crossover_type", ["single_point", "two_points", "uniform", "scattered", None]
    )
    crossover_probability = trial.suggest_float("crossover_probability", 0, 1)

    mutation_type = trial.suggest_categorical(
        "mutation_type", ["random", "swap", "inversion", "scramble", "adaptive", None]
    )
    mutation_probability = trial.suggest_float("mutation_probability", 0, 1)

    # Configura o TorchGA para criar populações baseadas no modelo
    torch_ga = torchga.TorchGA(model=test_model, num_solutions=50)

    # Configura o algoritmo genético
    ga_instance = pygad.GA(
        # Configurações
        initial_population=torch_ga.population_weights,  # População inicial
        fitness_func=fitness_func,  # Função de aptidão
        num_generations=80,  # Número de gerações
        random_seed=42,
        init_range_low=-4,
        init_range_high=4,
        # Parâmetros para otimizar
        parent_selection_type=parent_selection_type,
        keep_elitism=keep_elitism,
        num_parents_mating=num_parents_mating,
        crossover_type=crossover_type,  # type: ignore
        crossover_probability=crossover_probability,
        mutation_type=mutation_type,  # type: ignore
        mutation_probability=(
            mutation_probability if mutation_type != "adaptive" else [0.8, 0.1]
        ),
    )

    # Executa o algoritmo genético
    ga_instance.run()

    _, best_solution_fitness, _ = ga_instance.best_solution()

    return float(best_solution_fitness)


study = optuna.create_study(direction="maximize", study_name="GA-study")

if not os.path.exists(GA_study_path):
    print("Otimizando hiperparametros do Algoritmo Genético")
    study.optimize(objective, n_trials=100)
    print("Salvando estudo para futuras execuções")
    hkl.dump(study, GA_study_path)
else:
    print("Recuperando estudo dos hiperparametros do Algoritmo Genético.")
    study = hkl.load(GA_study_path)

best_GA = study.best_trial
print("Melhor GA:")
print("  Valor do Loss:", best_GA.value)

print("  hiperparametros:")
for key, value in best_GA.params.items():
    print(f"    {key}: {value}")


[I 2025-01-12 20:23:57,534] A new study created in memory with name: GA-study


Recuperando estudo dos hiperparametros do Algoritmo Genético.
Melhor GA:
  Valor do Loss: -0.2702963352203369
  hiperparametros:
    parent_selection_type: tournament
    keep_elitism: 8
    num_parents_mating: 4
    crossover_type: None
    crossover_probability: 0.27540356384223963
    mutation_type: adaptive
    mutation_probability: 0.3874911981463413


### Medindo desempenho


In [10]:
GA_times = []
GA_losses = []
GA_model = BaseModel()


def on_generation(ga_instance):
    if ga_instance.best_solution()[1] > -target_loss:
        return "stop"

    return None


if best_GA.params["mutation_type"] == "adaptive":
    best_GA.params["mutation_probability"] = [0.8, 0.1]

if not (os.path.exists(GA_results_path) and os.path.exists(GA_model_path)):
    print("Fazendo testes de desempenho do GA")
    for i in range(n_execuções):
        start_time = time.monotonic()
        torch.manual_seed(i)
        GA_model = BaseModel()

        torch_ga = torchga.TorchGA(model=GA_model, num_solutions=100)
        ga_instance = pygad.GA(
            initial_population=torch_ga.population_weights,
            fitness_func=fitness_func,
            random_seed=i,
            num_generations=50,
            init_range_low=-4,
            init_range_high=4,
            on_generation=on_generation,
            **best_GA.params,
        )
        ga_instance.run()
        best_solution, best_solution_fitness, _ = ga_instance.best_solution()

        elapsed_time = time.monotonic() - start_time
        GA_times.append(elapsed_time)
        GA_losses.append(-best_solution_fitness)

        model_weights_dict = torchga.model_weights_as_dict(
            model=GA_model, weights_vector=best_solution
        )
        GA_model.load_state_dict(model_weights_dict)

    print("Salvando para futuras execuções")
    hkl.dump((GA_times, GA_losses), GA_results_path)
    GA_model.eval()
    torch.save(GA_model.state_dict(), GA_model_path)
else:
    print("Recuperando testes anteriores")
    GA_times, GA_losses = hkl.load(GA_results_path)
    GA_model.load_state_dict(torch.load(GA_model_path, weights_only=True))

print(f"Média do tempo (GA): {np.mean(GA_times):.3f}s")
print("Tentativas falhadas:", count_fails(GA_losses))


Recuperando testes anteriores
Média do tempo (GA): 45.804s
Tentativas falhadas: 2


In [11]:
pinn_h1, pinn_h2 = test_model(GA_model)

# Gráfico
plot_tanks(
    t,
    (h1_exp, h2_exp, pinn_h1, pinn_h2),
    ["h1 (exp)", "h2 (exp)", "h1 (PINN GA)", "h2 (PINN GA)"],
    scatter=2,
    filename="ga_tanks",
)


In [12]:
for nome, param in GA_model.named_parameters():
    print(f"Nome: {nome}")
    print(f"Valor: {param}")


Nome: hidden_layer.0.weight
Valor: Parameter containing:
tensor([[ 2.1858],
        [-0.1780],
        [-1.6431],
        [ 1.0773],
        [-1.2982],
        [ 1.0562],
        [-1.3724],
        [ 1.2637]], requires_grad=True)
Nome: hidden_layer.0.bias
Valor: Parameter containing:
tensor([ 1.8094,  0.2047,  0.3160, -0.0519,  2.0033, -0.9800, -1.3178, -0.1154],
       requires_grad=True)
Nome: hidden_layer.2.weight
Valor: Parameter containing:
tensor([[-0.0243, -2.4715, -0.2404, -0.8101, -0.8423, -1.3759, -1.6582, -0.2309],
        [-0.6367,  0.7258,  0.0584, -0.3996, -0.6812,  1.3138,  2.0029,  1.4318],
        [ 1.2525, -1.3153, -2.0236,  0.6997, -0.5913,  1.9079, -0.6822, -0.9790],
        [-0.4022, -0.9995,  1.9338, -0.5817,  0.7677, -1.2706,  0.2724, -0.4656],
        [ 0.9965,  0.3907, -1.5333,  0.0191,  0.5278, -0.2682, -0.3564,  0.7896],
        [-0.1341, -2.2246, -0.4204, -0.0642,  0.3585, -1.3890,  1.0711,  0.2997],
        [-0.2110,  0.9569,  0.3655, -0.3288, -0.6878,  0.3