In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import matplotlib
import matplotlib.pyplot as plt
import time

In [2]:
# PINN 모델 정의
class PINN(nn.Module):
    def __init__(self):
        super(PINN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(2, 64), nn.Tanh(),
            nn.Linear(64, 64), nn.Tanh(),
            nn.Linear(64, 64), nn.Tanh(),
            nn.Linear(64, 64), nn.Tanh(),
            nn.Linear(64, 64), nn.Tanh(),
            nn.Linear(64, 64), nn.Tanh(),
            nn.Linear(64, 3)
        )

    def forward(self, x, y):
        input_data = torch.stack([x.view(-1), y.view(-1)], dim=1)
        return self.model(input_data)


# 경계 조건 (물리적 단위에 맞게 수정)
def boundary_conditions_wall_cylinder(x, y):

    u_boundary = np.zeros_like(x) 
    v_boundary = np.zeros_like(y)  

    return u_boundary, v_boundary

# 경계 조건 (물리적 단위에 맞게 수정)
def boundary_conditions_wall_channel( x, y):

    u_boundary = np.zeros_like(x) 
    v_boundary = np.zeros_like(y)  

    return u_boundary, v_boundary

def boundary_conditions_inflow(x,y):
    u_boundary = 4*0.3*(y)*(0.4-y)/(0.4*0.4) 
    v_boundary = np.zeros_like(y)  
    return u_boundary, v_boundary


def boundary_conditions_outflow(x,y):
    p_boundary = np.zeros_like(x)  
    return p_boundary


def navier_stokes_residual(uvp, x, y):
    u, v, p = torch.chunk(uvp, 3, dim=1)  

    # Constants
    rho = 1  # Density
    mu = 0.001  # Viscosity coefficient

    # Compute derivatives using automatic differentiation
    u_x = torch.autograd.grad(u.sum(), x, create_graph=True)[0]
    u_y = torch.autograd.grad(u.sum(), y, create_graph=True)[0]
    u_xx = torch.autograd.grad(u_x.sum(), x, create_graph=True)[0]
    u_yy = torch.autograd.grad(u_y.sum(), y, create_graph=True)[0]

    v_x = torch.autograd.grad(v.sum(), x, create_graph=True)[0]
    v_y = torch.autograd.grad(v.sum(), y, create_graph=True)[0]
    v_xx = torch.autograd.grad(v_x.sum(), x, create_graph=True)[0]
    v_yy = torch.autograd.grad(v_y.sum(), y, create_graph=True)[0]

    p_x = torch.autograd.grad(p.sum(), x, create_graph=True)[0]
    p_y = torch.autograd.grad(p.sum(), y, create_graph=True)[0] 

    # Navier-Stokes equations for u and v momentum
    residual_u = rho*(u * u_x + v * u_y) - mu*(u_xx + u_yy) + p_x
    residual_v = rho*(u * v_x + v * v_y) - mu*(v_xx + v_yy) + p_y
    
    # Continuity equation for incompressibility condition
    residual_p = (u_x + v_y)

    return residual_u, residual_v, residual_p

In [3]:
# cylinder 벽면 점들을 추가
cylinder_points = 1000
theta = np.linspace(0, 2 * np.pi, cylinder_points).reshape(-1, 1)
x_wall_boundary = 0.2 + np.cos(theta)*0.05  
y_wall_boundary = 0.2 + np.sin(theta)*0.05


# inflow 점들을 추가
y_inflow_boundary = np.linspace(0, 0.4, 400).reshape(-1, 1)
x_inflow_boundary = np.zeros_like(y_inflow_boundary)

# outflow 점들을 추가
y_outflow_boundary = np.linspace(0, 0.4, 400).reshape(-1, 1)
x_outflow_boundary = np.full_like(y_outflow_boundary, 2.2)

x_up_wall_boundary = np.linspace(0, 2.2, 2200).reshape(-1, 1)
y_up_wall_boundary = np.full_like(x_up_wall_boundary, 0.4)


x_down_wall_boundary = np.linspace(0, 2.2, 2200).reshape(-1, 1)
y_down_wall_boundary = np.full_like(x_down_wall_boundary, 0)


x_collocation = []
y_collocation = []

cnt = 0
while True:
    # if (cnt < 5000):
    #     x_c = np.random.uniform(0, 0.41)
    # else:
    #     x_c = np.random.uniform(0, 2.2)
    x_c = np.random.uniform(0, 2.2)
    y_c = np.random.uniform(0, 0.4)

    if not ((x_c-0.2) ** 2 + (y_c-0.2) ** 2 < 0.05 ** 2):
        x_collocation.append(x_c)
        y_collocation.append(y_c)
        cnt += 1
        if (cnt > 20000):
            break

x_collocation = np.array(x_collocation).reshape(-1, 1)
y_collocation = np.array(y_collocation).reshape(-1, 1)

In [4]:
def train_PINN(model, x_wall_boundary, y_wall_boundary,x_up_wall_boundary, y_up_wall_boundary, x_down_wall_boundary, y_down_wall_boundary, x_inflow_boundary, y_inflow_boundary,x_outflow_boundary, y_outflow_boundary, x_collocation, y_collocation, epochs, learning_rate):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    #optimizer = optim.LBFGS(model.parameters(), lr = 10, line_search_fn="strong_wolfe")
    #scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, eta_min= 0.00001,T_max=1000)

    loss_fn = nn.MSELoss()
    
    def closure():
        optimizer.zero_grad()

        x_wall_boundary_tensor, y_wall_boundary_tensor, \
        x_up_wall_boundary_tensor, y_up_wall_boundary_tensor, \
        x_down_wall_boundary_tensor, y_down_wall_boundary_tensor,\
        x_inflow_boundary_tensor, y_inflow_boundary_tensor,\
        x_outflow_boundary_tensor, y_outflow_boundary_tensor,\
        x_collocation_tensor, y_collocation_tensor  = (
            torch.tensor(_, dtype=torch.float32, device="cuda", requires_grad=True) for _ in
            [x_wall_boundary, y_wall_boundary, \
            x_up_wall_boundary, y_up_wall_boundary, \
            x_down_wall_boundary, y_down_wall_boundary, \
            x_inflow_boundary, y_inflow_boundary,\
            x_outflow_boundary, y_outflow_boundary,\
            x_collocation, y_collocation]
        )
    
        loss = 0

        u_wall_boundary_pred = model(x_wall_boundary_tensor, y_wall_boundary_tensor)
        u_wall_boundary_exact = torch.tensor(boundary_conditions_wall_cylinder(x_wall_boundary, y_wall_boundary), requires_grad=True, dtype=torch.float32, device = "cuda")
        loss += loss_fn(u_wall_boundary_pred[:,0].view(-1, 1), u_wall_boundary_exact[0].view(-1,1))
        loss += loss_fn(u_wall_boundary_pred[:,1].view(-1, 1), u_wall_boundary_exact[1].view(-1,1))

        u_inflow_boundary_pred = model(x_inflow_boundary_tensor, y_inflow_boundary_tensor)
        u_inflow_boundary_exact = torch.tensor(boundary_conditions_inflow(x_inflow_boundary, y_inflow_boundary ), requires_grad=True, dtype=torch.float32, device = "cuda")
        loss += loss_fn(u_inflow_boundary_pred[:,0].view(-1, 1), u_inflow_boundary_exact[0].view(-1,1))
        loss += loss_fn(u_inflow_boundary_pred[:,1].view(-1, 1), u_inflow_boundary_exact[1].view(-1,1))
        
        
        u_collocation_pred = model(x_collocation_tensor, y_collocation_tensor)
        equation_residual_u, equation_residual_v, equation_residual_p = navier_stokes_residual(u_collocation_pred, x_collocation_tensor, y_collocation_tensor)
        loss += loss_fn(equation_residual_u, torch.zeros_like(equation_residual_u))
        loss += loss_fn(equation_residual_v, torch.zeros_like(equation_residual_v))
        loss += loss_fn(equation_residual_p, torch.zeros_like(equation_residual_p))


        u_up_wall_boundary_pred = model(x_up_wall_boundary_tensor, y_up_wall_boundary_tensor)
        u_up_wall_boundary_exact = torch.tensor(boundary_conditions_wall_channel(x_up_wall_boundary, y_up_wall_boundary ), requires_grad=True, dtype=torch.float32, device = "cuda")
        loss += loss_fn(u_up_wall_boundary_pred[:,0].view(-1, 1), u_up_wall_boundary_exact[0].view(-1,1))
        loss += loss_fn(u_up_wall_boundary_pred[:,1].view(-1, 1), u_up_wall_boundary_exact[1].view(-1,1))


        u_down_wall_boundary_pred = model(x_down_wall_boundary_tensor, y_down_wall_boundary_tensor)
        u_down_wall_boundary_exact = torch.tensor(boundary_conditions_wall_channel(x_down_wall_boundary, y_down_wall_boundary), requires_grad=True, dtype=torch.float32, device = "cuda")
        loss += loss_fn(u_down_wall_boundary_pred[:,0].view(-1, 1), u_down_wall_boundary_exact[0].view(-1,1))
        loss += loss_fn(u_down_wall_boundary_pred[:,1].view(-1, 1), u_down_wall_boundary_exact[1].view(-1,1))
        

        u_outflow_boundary_pred = model(x_outflow_boundary_tensor, y_outflow_boundary_tensor)
        u_outflow_boundary_exact = torch.tensor(boundary_conditions_outflow(x_outflow_boundary, y_outflow_boundary), requires_grad=True, dtype=torch.float32, device = "cuda")
        loss += loss_fn(u_outflow_boundary_pred[:,2].view(-1, 1), u_outflow_boundary_exact.view(-1,1))

        loss.backward()
        return loss
        
    min_loss = 0.5
    for epoch in range(epochs):
        loss_ = optimizer.step(closure)
        #scheduler.step()
        
        if (torch.isnan(loss_)):
            break

        if (epoch%10 == 0):
            print(f"Epoch [{epoch}/{epochs}], Loss: {loss_.item()}")
            torch.save(model.state_dict(), "model/test_current.pt")

        if (loss_.item()<min_loss):
            torch.save(model.state_dict(), "model/test_current_min6.pt")
            min_loss = loss_.item()
        

    print("Training completed.")


In [5]:
# 모델 생성 및 학습 
model = PINN().cuda()
model.load_state_dict(torch.load("model/test_current_min5.pt"))

<All keys matched successfully>

In [6]:
train_PINN(model, x_wall_boundary, y_wall_boundary, x_up_wall_boundary, y_up_wall_boundary, x_down_wall_boundary, y_down_wall_boundary, x_inflow_boundary, y_inflow_boundary,x_outflow_boundary, y_outflow_boundary, x_collocation, y_collocation, epochs=50000, learning_rate=0.0001)

Epoch [0/50000], Loss: 0.0017726932419463992
Epoch [10/50000], Loss: 0.000206630167667754
Epoch [20/50000], Loss: 0.000126705490401946
Epoch [30/50000], Loss: 7.434558938257396e-05
Epoch [40/50000], Loss: 5.4944524890743196e-05
Epoch [50/50000], Loss: 4.510513463173993e-05
Epoch [60/50000], Loss: 3.9113074308261275e-05
Epoch [70/50000], Loss: 3.6593446566257626e-05
Epoch [80/50000], Loss: 3.426883995416574e-05
Epoch [90/50000], Loss: 3.250947338528931e-05
Epoch [100/50000], Loss: 3.083053161390126e-05
Epoch [110/50000], Loss: 3.004892641911283e-05
Epoch [120/50000], Loss: 2.9185894163674675e-05
Epoch [130/50000], Loss: 2.8022885089740157e-05
Epoch [140/50000], Loss: 2.764000601018779e-05
Epoch [150/50000], Loss: 2.6998442990588956e-05
Epoch [160/50000], Loss: 2.6511477699386887e-05
Epoch [170/50000], Loss: 2.6196679755230434e-05
Epoch [180/50000], Loss: 2.6418507331982255e-05
Epoch [190/50000], Loss: 2.551271245465614e-05
Epoch [200/50000], Loss: 2.5132600057986565e-05
Epoch [210/50000

KeyboardInterrupt: 