In [None]:
import torch
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time
import torch.nn.functional as F
from skopt import BayesSearchCV
from skopt.space import Real
from skopt import gp_minimize
import torch.nn as nn
import seaborn as sns
from scipy.optimize import minimize
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import matplotlib as mpl
from skopt.utils import use_named_args
from tqdm import tqdm
import multiprocessing
import threading

In [2]:
# 设置GPU设备，如果没有可用的GPU，则使用CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

setup_seed(888888)

print(device)

cpu


In [13]:
# 基础参数
epochs = 50001    # 训练代数
h = 1000    # 画图网格密度
N = 1000    # 内点配置点数
N1 = 0.01    # 边界点配置点数
N2 = 1000    # PDE数据点

# 约束函数

## 内点

In [9]:
def ugeq(n=N2):
    x = torch.rand(n, 1, device=device)
    y = torch.rand(n, 1, device=device)
    cond = torch.zeros_like(y)
    return x.requires_grad_(True), y.requires_grad_(True), cond

def pde(n=N2):
    x = torch.rand(n, 1, device=device)
    y = torch.rand(n, 1, device=device)
    cond = 2 * torch.pi**2 * torch.sin(torch.pi * x) * torch.sin(torch.pi * y)
    return x.requires_grad_(True), y.requires_grad_(True), cond

def JU(n=N2):
    # 不等式形
    x = torch.rand(n, 1, device=device)
    y = torch.rand(n, 1, device=device)
    cond = 2 * torch.pi**2 * torch.sin(torch.pi * x) * torch.sin(torch.pi * y)
    return x.requires_grad_(True), y.requires_grad_(True), cond

## 边界

In [11]:
# 边界条件
def boundary1(n=N1):    # 上边界
    x = torch.arange(0, 1.001, N1, device=device).view(-1, 1)
    y = torch.ones_like(x, device=device)
    cond = torch.zeros_like(x, device=device)
    return x.requires_grad_(True), y.requires_grad_(True), cond

def boundary2(n=N1):    # 右边界
    y = torch.arange(0, 1.001, N1, device=device).view(-1, 1)
    x = torch.ones_like(y, device=device)
    cond = torch.zeros_like(x, device=device)
    return x.requires_grad_(True), y.requires_grad_(True), cond

def boundary3(n=N1):
    # y轴
    y = torch.arange(0, 1.001, N1, device=device).view(-1, 1)
    x = torch.zeros_like(y, device=device)
    cond = torch.zeros_like(x, device=device)
    return x.requires_grad_(True), y.requires_grad_(True), cond

def boundary4(n=N1):
    # x轴
    x = torch.arange(0, 1.001, N1, device=device).view(-1, 1)
    y = torch.zeros_like(x, device=device)
    cond = torch.zeros_like(x, device=device)
    return x.requires_grad_(True), y.requires_grad_(True), cond

# Plnn框架


In [5]:
class MLP(torch.nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.net = torch.nn.Sequential(
            torch.nn.Linear(2, 32),
            torch.nn.Tanh(),
            torch.nn.Linear(32, 32),
            torch.nn.Tanh(),
            torch.nn.Linear(32, 32),
            torch.nn.Tanh(),
            torch.nn.Linear(32, 32),
            torch.nn.Tanh(),
            torch.nn.Linear(32, 1)
        )

    def forward(self, x):
        return self.net(x)

# 损失函数

In [7]:
# Loss
loss = torch.nn.MSELoss()

# 递归求导
def gradients(u, x, order=1):
    if order == 1:
        return torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u),
                                   create_graph=True,
                                   only_inputs=True, )[0]
    else:
        return gradients(gradients(u, x), x, order=order - 1)

In [8]:
def l_ugeq(u):
    # u>=0
    x, y, cond = ugeq()
    uxy = u(torch.cat([x, y], dim=1))
    llos = torch.relu(-uxy)
    return 100 * torch.sum(llos)

def l_pde(u):
    # 等式项损失
    x, y, cond = pde()
    uxy = u(torch.cat([x, y], dim=1))
    return loss(-gradients(uxy, x, 2) - gradients(uxy, y, 2), cond)

def l_JU(u):
    # 不等式损失
    x, y, cond = JU()
    uxy = u(torch.cat([x, y], dim=1))
    llos = 0.5 * (gradients(uxy, x, 1)**2 + gradients(uxy, y, 1)**2) - cond * uxy
    return loss(0.5 * (gradients(uxy, x, 1)**2 + gradients(uxy, y, 1)**2), cond * uxy)

In [12]:
# 边界函数构建
def l_boundary1(u):
    x, y, cond = boundary1()
    uxy = u(torch.cat([x, y], dim=1))
    return loss(uxy, cond)

def l_boundary2(u):
    x, y, cond = boundary2()
    uxy = u(torch.cat([x, y], dim=1))
    return loss(uxy, cond)

def l_boundary3(u):
    x, y, cond = boundary3()
    uxy = u(torch.cat([x, y], dim=1))
    return loss(uxy, cond)

def l_boundary4(u):
    x, y, cond = boundary4()
    uxy = u(torch.cat([x, y], dim=1))
    return loss(uxy, cond)

# 纯训练

In [18]:
# Training
u = MLP().to(device)
opt = torch.optim.Adam(params=u.parameters(), lr=0.0005)
current_time = time.time()

# 设置初始权重
weight_pde = 55
weight_JU = 2
weight_boundary = 100

for i in range(epochs):
    
    opt.zero_grad()

    # 计算损失
    loss_pde = l_pde(u)
    loss_JU = l_JU(u)
    loss_ugeq = l_ugeq(u)
    loss_boundary = l_boundary1(u) +  l_boundary2(u) + l_boundary3(u) + l_boundary4(u)

    # 计算加权总损失
    total_loss = weight_pde * loss_pde + weight_JU * loss_JU + 1000 * loss_ugeq + weight_boundary * loss_boundary

    # 反向传播和优化
    total_loss.backward()
    opt.step()

    # 每100次迭代记录一次损失
    if i % 100 == 0:
        print(f"At epoch {i}, time_speed:{abs(current_time-time.time())} loss is: {total_loss.data}")
        print('------------------------------------------------------------------------------------------------------------------------------------')
        current_time = time.time()


At epoch 0, time_speed:0.5833084583282471 loss is: 19608414.0
------------------------------------------------------------------------------------------------------------------------------------


KeyboardInterrupt: 

# 贝叶斯

## 权重训练

In [None]:
# 定义训练函数
def train_model(weight_pde, weight_JU, weight_boundary):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    u = MLP().to(device)
    opt = torch.optim.Adam(params=u.parameters(), lr=0.0005)
    current_time = time.time()
    epochs = 1000  # 设置为实际的训练轮数
    for i in range(epochs):
        opt.zero_grad()
        
        # 计算损失
        loss_pde = l_pde(u)
        loss_JU = l_JU(u)
        loss_ugeq = l_ugeq(u)
        loss_boundary = l_boundary1(u) + l_boundary2(u) + l_boundary3(u) + l_boundary4(u)
        
        # 计算加权总损失
        total_loss = weight_pde * loss_pde + weight_JU * loss_JU + 1000 * loss_ugeq + weight_boundary * loss_boundary
        
        # 反向传播和优化
        total_loss.backward()
        opt.step()
        
        # 每100次迭代记录一次损失
        if i % 100 == 0:
            print(f"At epoch {i}, time_speed:{abs(current_time-time.time())} loss is: {total_loss.data}")
            print('------------------------------------------------------------------------------------------------------------------------------------')
            current_time = time.time()
    
    # 返回最终的损失作为优化目标
    return total_loss.item()

# 定义贝叶斯优化目标函数
def objective(trial):
    weight_pde = trial.suggest_float('weight_pde', 1, 100)
    weight_JU = trial.suggest_float('weight_JU', 1, 100)
    weight_boundary = trial.suggest_float('weight_boundary', 1, 100)
    return train_model(weight_pde, weight_JU, weight_boundary)

# 运行贝叶斯优化
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

# 输出最佳权重
best_params = study.best_params
print(f"Best parameters: {best_params}")

## 可视化

In [None]:
trials = [
    {'weight_pde': 55.0, 'weight_JU': 2.0, 'weight_boundary': 100.0, 'total_loss': 0.123},
    {'weight_pde': 60.5, 'weight_JU': 3.1, 'weight_boundary': 95.2, 'total_loss': 0.117},
    {'weight_pde': 50.2, 'weight_JU': 1.8, 'weight_boundary': 105.3, 'total_loss': 0.119},
    # 更多试验数据...
]

# 提取数据
weights_pde = [trial['weight_pde'] for trial in trials]
weights_JU = [trial['weight_JU'] for trial in trials]
weights_boundary = [trial['weight_boundary'] for trial in trials]
total_losses = [trial['total_loss'] for trial in trials]
trial_numbers = list(range(1, len(trials) + 1))

# 绘制权重变化折线图
plt.figure(figsize=(10, 6))
plt.plot(trial_numbers, weights_pde, label='Weight PDE')
plt.plot(trial_numbers, weights_JU, label='Weight JU')
plt.plot(trial_numbers, weights_boundary, label='Weight Boundary')
plt.xlabel('Trial Number')
plt.ylabel('Weight Value')
plt.title('Weight Values Across Trials')
plt.legend()
plt.show()

# 绘制总损失变化折线图
plt.figure(figsize=(10, 6))
plt.plot(trial_numbers, total_losses, label='Total Loss', color='red')
plt.xlabel('Trial Number')
plt.ylabel('Total Loss')
plt.title('Total Loss Across Trials')
plt.legend()
plt.show()