# Modelagem Orientada a Objetos

Diante da necessidade de estruturar o código de maneira mais clara e adequada e também da possibilidade de adicionar novas funcionalidades, como validação e testes, foi lançado mão a orientação a objetos, a qual será abordada neste notebook.

In [None]:
!pip install plotly --quiet

import torch
import torch.nn as nn
import numpy as np

import plotly.express as px
import plotly.graph_objects as go

import torch.optim as optim

np.random.seed(1)

## Definição da Classe

Na abordagem orientada a objetos nesta implementação, a classe apresenta um cosntrutor que define os principais parâmetros da rede, ao passo que cada um dos métodos são, na realidade, passos pequenas passos na abordagem da solução.

In [None]:
class pinn:

  # Método contrutor
  def __init__(self, comprimento_x: int = 1, comprimento_y: int = 1, tempo_final: int = 1,
    pontos_no_contorno: int = 1000, pontos_no_dominio: int = 2000, numero_de_neuronios = [3, 20, 20, 20, 2],
    alpha: float = 0.2, epocas: int = 20000, learning_rate: float = 0.01,
    train: bool = False, verbose: bool = False, max_time = None,
    device = torch.device('cuda' if torch.cuda.is_available () else 'cpu')):

    self.comprimento_x = comprimento_x
    self.comprimento_y = comprimento_y
    self.tempo_final = tempo_final
    self.alpha = alpha
    self.epocas = epocas
    self.learning_rate = learning_rate
    self.pontos_no_contorno = pontos_no_contorno
    self.pontos_no_dominio = pontos_no_dominio
    self.numero_de_neuronios = numero_de_neuronios
    self.learning_rate = learning_rate

    if type(device) == str:
      device = torch.device(device)
    self.device = device

    if train:
      self.treinamento_da_rede()

  # Método para geração de pontos no domínio
  def gerar_pontos_contorno(self):
    pontos_por_lado = self.pontos_no_contorno // 6

    # Lado 1 (x = qualquer, y= 0, t = qualquer)
    x_lado1 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.comprimento_x)
    y_lado1 = 0 * np.ones((pontos_por_lado,1))
    t_lado1 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.tempo_final)

    u_lado1 = 0 * np.ones((pontos_por_lado,1))
    v_lado1 = 0 * np.ones((pontos_por_lado,1))

    # Lado 2 (x = 0, y= qualquer, t = qualquer)
    x_lado2 = 0 * np.ones((pontos_por_lado,1))
    y_lado2 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.comprimento_y)
    t_lado2 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.tempo_final)

    u_lado2 = 0 * np.ones((pontos_por_lado,1))
    v_lado2 = 0 * np.ones((pontos_por_lado,1))

    # Lado 3 (x = 1, y = qualquer, t = qualquer)
    x_lado3 = 1 * np.ones((pontos_por_lado,1))
    y_lado3 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.comprimento_y)
    t_lado3 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.tempo_final)

    u_lado3 = 0 * np.ones((pontos_por_lado,1))
    v_lado3 = 0 * np.ones((pontos_por_lado,1))

    # Lado 4 (x = qualquer, y = 1, t = qualquer)
    x_lado4 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.comprimento_x)
    y_lado4 = 1 * np.ones((pontos_por_lado,1))
    t_lado4 = np.random.uniform(size=(pontos_por_lado,1),low=0,high=self.tempo_final)

    u_lado4 = 0 * np.ones((pontos_por_lado,1))
    v_lado4 = 0 * np.ones((pontos_por_lado,1))

    # Condicao inicial (x = qualquer, y=qualquer, t = 0)
    x_inicial = np.random.uniform(size=(2*pontos_por_lado,1), low=0, high=self.comprimento_x)
    y_inicial = np.random.uniform(size=(2*pontos_por_lado,1), low=0, high=self.comprimento_y)
    t_inicial = 0 * np.ones((2*pontos_por_lado,1))

    # Modificação das condições iniciais para garantir u=0 nos contornos
    u_inicial = 0 * np.ones((2*pontos_por_lado,1))
    v_inicial = np.sin(np.pi * x_inicial / self.comprimento_x) * np.cos(np.pi * x_inicial / self.comprimento_y)


    # Juntar todos os lados
    x_todos = np.vstack((x_lado1,x_lado2,x_lado3,x_lado4,x_inicial))
    y_todos = np.vstack((y_lado1,y_lado2,y_lado3,y_lado4,y_inicial))
    t_todos = np.vstack((t_lado1,t_lado2,t_lado3,t_lado4,t_inicial))
    u_todos = np.vstack((u_lado1,u_lado2,u_lado3,u_lado4,u_inicial))
    v_todos = np.vstack((v_lado1,v_lado2,v_lado3,v_lado4,v_inicial))


    # Criar arrays X e Y
    X_contorno = np.hstack((x_todos,y_todos,t_todos))
    Y_contorno = np.hstack((u_todos,v_todos))

    self.X_contorno = X_contorno
    self.Y_contorno = Y_contorno
    # X contorno - reúne os pontos
    # Y contorno - reúne velocidades

  def gerar_pontos_equacao(self):
    x_dominio = np.random.uniform(size=(self.pontos_no_dominio,1),low=0,high=self.comprimento_x)
    y_dominio = np.random.uniform(size=(self.pontos_no_dominio,1),low=0,high=self.comprimento_y)
    t_dominio = np.random.uniform(size=(self.pontos_no_dominio,1),low=0,high=self.tempo_final)

    X_equacao = np.hstack((x_dominio,y_dominio,t_dominio))

    self.X_equacao = X_equacao

  def criar_rede_neural(self):
    # Criar uma lista de todas as camadas
    camadas = []

    # Para cada camada, adicionar as conexões e a função de ativação
    for i in range(len(self.numero_de_neuronios)-1):
      camadas.append(nn.Linear(self.numero_de_neuronios[i],self.numero_de_neuronios[i+1]))
      camadas.append(nn.Tanh())

    # Remover a última camada, pois é a função de ativação
    camadas.pop()
    #camadas.pop()

    # Criar rede
    self.rna = nn.Sequential(*camadas)

  def calc_perda_contorno(self):
    Y_predito = self.rna(self.X_contorno)
    perda_contorno = nn.functional.mse_loss(Y_predito, self.Y_contorno)
    self.perda_contorno = perda_contorno

  def calc_residuo(self):
    x = self.X_equacao[:,0].reshape(-1, 1)
    y = self.X_equacao[:,1].reshape(-1, 1)
    t = self.X_equacao[:,2].reshape(-1, 1)

    # Dois valores preditos pela rede - velocidade u(x,y,t) e velocidade v(x,y,t)
    V = self.rna(torch.hstack((x, y, t)))
    u = V[:,0].reshape(-1,1)
    v = V[:,1].reshape(-1,1)

    u_x = torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    u_xx = torch.autograd.grad(u_x, x, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    u_y = torch.autograd.grad(u, y, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    u_yy = torch.autograd.grad(u_y, y, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]

    v_x = torch.autograd.grad(v, x, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    v_xx = torch.autograd.grad(v_x, x, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    v_y = torch.autograd.grad(v, y, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    v_yy = torch.autograd.grad(v_y, y, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]

    u_t = torch.autograd.grad(u, t, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]
    v_t = torch.autograd.grad(v, t, grad_outputs=torch.ones_like(u),retain_graph=True, create_graph=True)[0]

    residual_u = (u_t + u * u_x + v * u_y - 0.01/np.pi * (u_xx + u_yy))
    residual_v = (v_t + u * v_x + v * v_y - 0.01/np.pi * (u_xx + u_yy))

    residuo_total = torch.cat((residual_u, residual_v), dim=1)
    self.residuo_total = residuo_total

  def calc_perda_equacao(self):
    self.calc_residuo()
    residuo = torch.mean(torch.square(self.residuo_total))

    self.perda_equacao = residuo

  def calc_perda(self):

    self.calc_perda_contorno()
    self.calc_perda_equacao()

    perda = (1-self.alpha)*self.perda_contorno + self.alpha*self.perda_equacao
    self.perda = perda
    self.perda_contorno = self.perda_contorno
    self.perda_equacao = self.perda_equacao

  def definicao_otimizador(self):
      otimizador = torch.optim.Adam(self.rna.parameters(),self.learning_rate)
      agendador = torch.optim.lr_scheduler.StepLR(otimizador, step_size=1000, gamma=0.9)

      self.X_equacao = torch.tensor(self.X_equacao,requires_grad=True,dtype=torch.float)
      self.X_contorno = torch.tensor(self.X_contorno,dtype=torch.float)
      self.Y_contorno = torch.tensor(self.Y_contorno,dtype=torch.float)

      self.X_equacao = self.X_equacao.to(self.device)
      self.X_contorno = self.X_contorno.to(self.device)
      self.Y_contorno = self.Y_contorno.to(self.device)
      self.rna = self.rna.to(self.device)

  def calcular_grid(self,nx=101, ny=101, nt=101):

    # Definir grid
    x = np.linspace(0.,self.comprimento_x,nx)
    y = np.linspace(0.,self.comprimento_y,ny)
    t = np.linspace(0.,self.tempo_final,nt)
    [t_grid, y_grid, x_grid] = np.meshgrid(t,y,x)
    x = torch.tensor(x_grid.flatten()[:,None],requires_grad=True,dtype=torch.float).to(self.device)
    y = torch.tensor(y_grid.flatten()[:,None],requires_grad=True,dtype=torch.float).to(self.device)
    t = torch.tensor(t_grid.flatten()[:,None],requires_grad=True,dtype=torch.float).to(self.device)

    # Avaliar modelor
    self.rna.eval()
    Y_pred = self.rna(torch.hstack((x,y,t)))

    # Formatar resultados em array
    u_pred = Y_pred.cpu().detach().numpy()[:,0].reshape(x_grid.shape)
    v_pred = Y_pred.cpu().detach().numpy()[:,1].reshape(x_grid.shape)
    self.x_grid = x_grid
    self.y_grid = y_grid
    self.t_grid = t_grid
    self.u_pred = u_pred
    self.v_pred = v_pred

  def treinamento_da_rede(self):
    otimizador = torch.optim.Adam(self.rna.parameters(),self.learning_rate)
    agendador = torch.optim.lr_scheduler.StepLR(otimizador, step_size=1000, gamma=0.9)

    perda_historico = np.zeros(self.epocas)
    perda_contorno_historico = np.zeros(self.epocas)
    perda_equacao_historico = np.zeros(self.epocas)
    epocas = np.array(range(self.epocas))

    # Colocar rede em modo de treinamento
    self.rna.train()

    # FAZER ITERAÇÃO
    for epoca in epocas:
      # Resortear pontos
      #X_equacao = gerar_pontos_equacao(pontos_no_dominio,comprimento_x,tempo_final)
      #X_equacao = torch.tensor(X_equacao,requires_grad=True,dtype=torch.float).to(device)

      # Inicializar gradientes
      otimizador.zero_grad()

      # Calcular perdas
      self.calc_perda()

      # Backpropagation
      self.perda.backward()

      # Passo do otimizador
      otimizador.step()
      agendador.step()

      # Guardar logs
      perda_historico[epoca] = self.perda.item()
      perda_contorno_historico[epoca] = self.perda_contorno.item()
      perda_equacao_historico[epoca] = self.perda_equacao.item()

      if epoca%500==0:
        print(f'Epoca: {epoca}, Perda: {self.perda.item()} (Contorno: {self.perda_contorno.item()}, Equacao: {self.perda_equacao.item()})')
    self.perda_historico = perda_historico
    self.perda_contorno_historico = perda_contorno_historico
    self.perda_equacao_historico = perda_equacao_historico

  def plot_historico(self):
    # Plotar histórico
    epocas = np.array(range(self.epocas))
    fig = go.FigureWidget()
    fig.add_trace(go.Scatter(x=epocas, y=self.perda_historico, name='Total', line=dict(color='black', width=4)))
    fig.add_trace(go.Scatter(x=epocas, y=self.perda_contorno_historico, name='Contorno', line=dict(color='blue', width=2)))
    fig.add_trace(go.Scatter(x=epocas, y=self.perda_equacao_historico, name='Equacao', line=dict(color='red', width=2)))
    fig.update_yaxes(type="log")
    fig.show(renderer="colab")

  def plot_resultados(self):
    # Calcular valores da função e gerar grids
    epocas = np.array(range(self.epocas))
    self.calcular_grid()

    # Plotar figura
    fig = go.Figure(data=[go.Surface(x=self.x_grid, y=self.t_grid, z=self.u_pred)])

    fig.update_layout(scene=dict(aspectratio=dict(x=1.5, y=1.5, z=0.5)))
    fig.show()

    ind_t_plot = 0 # Esse é o índice do tempo a ser plotado

    # Extrair uma "fatia" dos arrays de coordenadas e soluções
    x_plot = self.x_grid[:,ind_t_plot,:]
    y_plot = self.y_grid[:,ind_t_plot,:]
    u_plot = self.u_pred[:,ind_t_plot,:]

    fig = go.Figure(data=[go.Surface(x=x_plot, y=y_plot, z=u_plot)])

    fig.update_layout(scene=dict(aspectratio=dict(x=1.5, y=1.5, z=0.5)))
    fig.show()

### Modelando e Resolvendo

In [None]:
pinn_burgers = pinn(1, 1, 1, 1000, 2000, [3, 20, 20, 20, 2], 0.2, 1000, 0.01, False, False, None, 'cuda')

Agora, para o contorno

In [None]:
pinn_burgers.gerar_pontos_contorno()
X_equacao = pinn_burgers.gerar_pontos_equacao()

In [None]:
pinn_burgers.criar_rede_neural()

Para o otimizador

In [None]:
pinn_burgers.definicao_otimizador()

Treinando a rede

In [None]:
pinn_burgers.treinamento_da_rede()

pinn_burgers.plot_historico()

Epoca: 0, Perda: 0.003730565309524536 (Contorno: 0.003354890737682581, Equacao: 0.005233264062553644)
Epoca: 500, Perda: 0.003521948354318738 (Contorno: 0.0031819180585443974, Equacao: 0.004882069770246744)
