In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import grad
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ===========================================
# 参数设置
# ===========================================
# 时间参数
t_start = 0.0
t_end = 0.5
time_steps = 10  # 时间步数

# 空间域
x_min, x_max = 0.0, 1.0
y_min, y_max = 0.0, 1.0

# 材料参数
K = torch.tensor([[1.0, 0.0], [0.0, 1.0]], dtype=torch.float32, device=device)
S = torch.tensor(0.0, dtype=torch.float32, device=device)  # 源项为0

# ===========================================
# 精确解函数 (用于验证)
# ===========================================
def exact_solution(x, y, t):
    """精确解函数"""
    return np.sin(np.pi * x) * np.sin(np.pi * y) * np.exp(-2 * np.pi**2 * t) + x + y

# ===========================================
# PINN 实现 (物理信息神经网络)
# ===========================================
class PINN(nn.Module):
    def __init__(self, layers):
        super(PINN, self).__init__()
        self.net = nn.Sequential()
        for i in range(len(layers) - 1):
            self.net.add_module(f"linear_{i}", nn.Linear(layers[i], layers[i + 1]))
            if i < len(layers) - 2:
                self.net.add_module(f"tanh_{i}", nn.Tanh())
    
    def forward(self, xyt):
        return self.net(xyt)

def train_pinn(epochs=10000, lr=0.001):
    print("Starting PINN training...")
    start_time = time.time()
    
    # 创建训练数据
    n_interior = 2000  # 内部点数量
    n_boundary = 500   # 边界点数量
    n_initial = 500    # 初始条件点数量
    
    # 内部点 (时空域)
    interior_points = torch.zeros(n_interior, 3, dtype=torch.float32, device=device)
    interior_points[:, 0] = torch.rand(n_interior) * (x_max - x_min) + x_min  # x
    interior_points[:, 1] = torch.rand(n_interior) * (y_max - y_min) + y_min  # y
    interior_points[:, 2] = torch.rand(n_interior) * (t_end - t_start) + t_start  # t
    
    # 边界点 (空间边界，时间变化)
    boundary_points = []
    boundary_values = []
    
    # x=0边界
    for _ in range(n_boundary // 4):
        y_val = torch.rand(1).item() * (y_max - y_min) + y_min
        t_val = torch.rand(1).item() * (t_end - t_start) + t_start
        boundary_points.append([0.0, y_val, t_val])
        boundary_values.append(y_val)  # u = y
    
    # x=1边界
    for _ in range(n_boundary // 4):
        y_val = torch.rand(1).item() * (y_max - y_min) + y_min
        t_val = torch.rand(1).item() * (t_end - t_start) + t_start
        boundary_points.append([1.0, y_val, t_val])
        boundary_values.append(1.0 + y_val)  # u = 1 + y
    
    # y=0边界
    for _ in range(n_boundary // 4):
        x_val = torch.rand(1).item() * (x_max - x_min) + x_min
        t_val = torch.rand(1).item() * (t_end - t_start) + t_start
        boundary_points.append([x_val, 0.0, t_val])
        boundary_values.append(x_val)  # u = x
    
    # y=1边界
    for _ in range(n_boundary // 4):
        x_val = torch.rand(1).item() * (x_max - x_min) + x_min
        t_val = torch.rand(1).item() * (t_end - t_start) + t_start
        boundary_points.append([x_val, 1.0, t_val])
        boundary_values.append(1.0 + x_val)  # u = 1 + x
    
    boundary_points = torch.tensor(boundary_points, dtype=torch.float32, device=device)
    boundary_values = torch.tensor(boundary_values, dtype=torch.float32, device=device)
    
    # 初始条件点 (t=0)
    initial_points = torch.zeros(n_initial, 3, dtype=torch.float32, device=device)
    initial_points[:, 0] = torch.rand(n_initial) * (x_max - x_min) + x_min  # x
    initial_points[:, 1] = torch.rand(n_initial) * (y_max - y_min) + y_min  # y
    initial_points[:, 2] = 0.0  # t=0
    
    # 初始值计算
    initial_values = torch.zeros(n_initial, dtype=torch.float32, device=device)
    for i in range(n_initial):
        x_val = initial_points[i, 0].item()
        y_val = initial_points[i, 1].item()
        initial_values[i] = np.sin(np.pi * x_val) * np.sin(np.pi * y_val) + x_val + y_val
    
    # 模型和优化器
    model = PINN([3, 50, 50, 50, 1]).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # 训练循环
    for epoch in range(epochs):
        optimizer.zero_grad()
        
        # 内部点损失
        interior_points.requires_grad = True
        u_pred = model(interior_points)
        
        # 计算时间导数
        du_dt = grad(u_pred, interior_points, torch.ones_like(u_pred), create_graph=True)[0][:, 2]
        
        # 计算空间梯度
        du_dx = grad(u_pred, interior_points, torch.ones_like(u_pred), create_graph=True)[0][:, 0]
        du_dy = grad(u_pred, interior_points, torch.ones_like(u_pred), create_graph=True)[0][:, 1]
        
        # 计算二阶导数
        d2u_dx2 = grad(du_dx, interior_points, torch.ones_like(du_dx), create_graph=True)[0][:, 0]
        d2u_dy2 = grad(du_dy, interior_points, torch.ones_like(du_dy), create_graph=True)[0][:, 1]
        
        # PDE 残差: ∂u/∂t - ∇·(K∇u) = 0 (S=0)
        # 对于各向同性扩散，K = [[1,0],[0,1]]
        pde_res = du_dt - (d2u_dx2 + d2u_dy2)
        pde_loss = torch.mean(pde_res**2)
        
        # 边界损失
        u_b_pred = model(boundary_points)
        bc_loss = torch.mean((u_b_pred.squeeze() - boundary_values)**2)
        
        # 初始条件损失
        u_i_pred = model(initial_points)
        ic_loss = torch.mean((u_i_pred.squeeze() - initial_values)**2)
        
        # 总损失
        loss = pde_loss + bc_loss + ic_loss
        loss.backward()
        optimizer.step()
        
        if epoch % 1000 == 0:
            print(f"Epoch {epoch}: Loss = {loss.item():.6f}, PDE Loss = {pde_loss.item():.6f}, "
                  f"BC Loss = {bc_loss.item():.6f}, IC Loss = {ic_loss.item():.6f}")
    
    training_time = time.time() - start_time
    print(f"PINN training completed in {training_time:.2f} seconds")
    return model

# ===========================================
# VPINN 实现 (变分物理信息神经网络)
# ===========================================
class VPINN(nn.Module):
    def __init__(self, layers):
        super(VPINN, self).__init__()
        self.net = nn.Sequential()
        for i in range(len(layers) - 1):
            self.net.add_module(f"linear_{i}", nn.Linear(layers[i], layers[i + 1]))
            if i < len(layers) - 2:
                self.net.add_module(f"tanh_{i}", nn.Tanh())
    
    def forward(self, xyt):
        return self.net(xyt)

def gauss_points(n=5):
    """生成高斯积分点和权重"""
    x, w = np.polynomial.legendre.leggauss(n)
    return torch.tensor(x, dtype=torch.float32, device=device), torch.tensor(w, dtype=torch.float32, device=device)

def train_vpinn(epochs=5000, lr=0.001):
    print("Starting VPINN training...")
    start_time = time.time()
    
    # 模型和优化器
    model = VPINN([3, 50, 50, 50, 1]).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # 高斯积分点
    gp, gw = gauss_points(5)
    
    # 训练循环
    for epoch in range(epochs):
        optimizer.zero_grad()
        loss_total = 0
        
        # 在空间单元上积分 (固定时间)
        n_space_cells = 10  # 空间单元数
        for i in range(n_space_cells):
            for j in range(n_space_cells):
                # 空间单元域 [xi, xi+1] x [yj, yj+1]
                xi = i / n_space_cells * (x_max - x_min) + x_min
                xip1 = (i + 1) / n_space_cells * (x_max - x_min) + x_min
                yj = j / n_space_cells * (y_max - y_min) + y_min
                yjp1 = (j + 1) / n_space_cells * (y_max - y_min) + y_min
                
                # 时间采样
                for t_val in np.linspace(t_start, t_end, time_steps):
                    # 映射到 [-1,1]x[-1,1]
                    xg = (gp + 1) * 0.5 * (xip1 - xi) + xi
                    yg = (gp + 1) * 0.5 * (yjp1 - yj) + yj
                    
                    # 创建积分点
                    XX, YY = torch.meshgrid(xg, yg)
                    points = torch.stack([XX.ravel(), YY.ravel(), torch.full_like(XX.ravel(), t_val)], dim=1)
                    points.requires_grad = True
                    
                    # 计算解
                    u = model(points)
                    
                    # 计算梯度
                    du = grad(u, points, torch.ones_like(u), create_graph=True)[0]
                    du_dx = du[:, 0]
                    du_dy = du[:, 1]
                    du_dt = du[:, 2]
                    
                    # 计算变分残差 (弱形式)
                    # ∂u/∂t - ∇·(K∇u) = 0 (S=0)
                    # 弱形式: ∫(∂u/∂t * v + ∇u·∇v) dx dy
                    # 这里v是测试函数，我们取v=1
                    residual = du_dt - (du_dx**2 + du_dy**2)
                    
                    # 高斯积分
                    jacobian = (xip1 - xi) * (yjp1 - yj) / 4
                    integral = torch.sum(residual.view(len(gp), len(gp)) * gw.unsqueeze(1) * gw.unsqueeze(0)) * jacobian
                    loss_total += integral**2
        
        # 边界损失 (空间边界)
        boundary_points = []
        boundary_values = []
        # 添加边界点
        for t_val in np.linspace(t_start, t_end, time_steps):
            # x=0边界
            for y_val in np.linspace(y_min, y_max, 10):
                boundary_points.append([0.0, y_val, t_val])
                boundary_values.append(y_val)
            
            # x=1边界
            for y_val in np.linspace(y_min, y_max, 10):
                boundary_points.append([1.0, y_val, t_val])
                boundary_values.append(1.0 + y_val)
            
            # y=0边界
            for x_val in np.linspace(x_min, x_max, 10):
                boundary_points.append([x_val, 0.0, t_val])
                boundary_values.append(x_val)
            
            # y=1边界
            for x_val in np.linspace(x_min, x_max, 10):
                boundary_points.append([x_val, 1.0, t_val])
                boundary_values.append(1.0 + x_val)
        
        boundary_points = torch.tensor(boundary_points, dtype=torch.float32, device=device)
        boundary_values = torch.tensor(boundary_values, dtype=torch.float32, device=device)
        
        u_b_pred = model(boundary_points)
        bc_loss = torch.mean((u_b_pred.squeeze() - boundary_values)**2)
        
        # 初始条件损失 (t=0)
        initial_points = []
        initial_values = []
        for x_val in np.linspace(x_min, x_max, 20):
            for y_val in np.linspace(y_min, y_max, 20):
                initial_points.append([x_val, y_val, 0.0])
                initial_values.append(np.sin(np.pi * x_val) * np.sin(np.pi * y_val) + x_val + y_val)
        
        initial_points = torch.tensor(initial_points, dtype=torch.float32, device=device)
        initial_values = torch.tensor(initial_values, dtype=torch.float32, device=device)
        
        u_i_pred = model(initial_points)
        ic_loss = torch.mean((u_i_pred.squeeze() - initial_values)**2)
        
        # 总损失
        loss = loss_total + bc_loss + ic_loss
        loss.backward()
        optimizer.step()
        
        if epoch % 500 == 0:
            print(f"Epoch {epoch}: Loss = {loss.item():.6f}, BC Loss = {bc_loss.item():.6f}, IC Loss = {ic_loss.item():.6f}")
    
    training_time = time.time() - start_time
    print(f"VPINN training completed in {training_time:.2f} seconds")
    return model



Using device: cuda


In [None]:
# ===========================================
# 训练和可视化
# ===========================================
if __name__ == "__main__":
    # 训练 PINN
    print("\n" + "="*50)
    pinn_model = train_pinn(epochs=5000)
    
    # 训练 VPINN
    print("\n" + "="*50)
    vpinn_model = train_vpinn(epochs=2000)
    
    # 可视化结果
    x = np.linspace(x_min, x_max, 50)
    y = np.linspace(y_min, y_max, 50)
    t_values = np.linspace(t_start, t_end, 5)  # 选择几个时间点可视化
    
    fig = plt.figure(figsize=(18, 12))
    
    for idx, t_val in enumerate(t_values):
        # 创建网格
        X, Y = np.meshgrid(x, y)
        xy = np.vstack([X.ravel(), Y.ravel()]).T
        t_array = np.full((xy.shape[0], 1), t_val)
        xyt = np.hstack([xy, t_array])
        
        # 精确解
        exact = np.zeros_like(X)
        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                exact[i, j] = exact_solution(X[i, j], Y[i, j], t_val)
        
        # PINN 预测
        with torch.no_grad():
            xyt_t = torch.tensor(xyt, dtype=torch.float32).to(device)
            pinn_pred = pinn_model(xyt_t).cpu().numpy().reshape(X.shape)
        
        # VPINN 预测
        with torch.no_grad():
            xyt_t = torch.tensor(xyt, dtype=torch.float32).to(device)
            vpinn_pred = vpinn_model(xyt_t).cpu().numpy().reshape(X.shape)
        
        # 计算误差
        pinn_error = np.abs(pinn_pred - exact)
        vpinn_error = np.abs(vpinn_pred - exact)
        
        # 绘图 - 精确解
        ax1 = fig.add_subplot(5, 3, idx*3 + 1, projection='3d')
        ax1.plot_surface(X, Y, exact, cmap='viridis')
        ax1.set_title(f'Exact Solution at t={t_val:.2f}')
        
        # 绘图 - PINN 预测
        ax2 = fig.add_subplot(5, 3, idx*3 + 2, projection='3d')
        ax2.plot_surface(X, Y, pinn_pred, cmap='viridis')
        ax2.set_title(f'PINN Prediction at t={t_val:.2f}')
        
        # 绘图 - VPINN 预测
        ax3 = fig.add_subplot(5, 3, idx*3 + 3, projection='3d')
        ax3.plot_surface(X, Y, vpinn_pred, cmap='viridis')
        ax3.set_title(f'VPINN Prediction at t={t_val:.2f}')
    
    plt.tight_layout()
    plt.savefig('pinn_vpinn_comparison_nonstationary_no_source.png')
    plt.show()
    
    # 保存模型
    torch.save(pinn_model.state_dict(), 'pinn_model_nonstationary_no_source.pth')
    torch.save(vpinn_model.state_dict(), 'vpinn_model_nonstationary_no_source.pth')
    print("Models saved to pinn_model_nonstationary_no_source.pth and vpinn_model_nonstationary_no_source.pth")

In [2]:
#读入模型 可视化真解和三个模型
pinn_model = PINN([3, 50, 50, 50, 1]).to(device)
pinn_model.load_state_dict(torch.load('pinn_model_nonstationary_no_source.pth'))
pinn_model.eval()
vpinn_model = VPINN([3, 50, 50, 50, 1]).to(device)
vpinn_model.load_state_dict(torch.load('vpinn_model_nonstationary_no_source.pth'))
vpinn_model.eval()


VPINN(
  (net): Sequential(
    (linear_0): Linear(in_features=3, out_features=50, bias=True)
    (tanh_0): Tanh()
    (linear_1): Linear(in_features=50, out_features=50, bias=True)
    (tanh_1): Tanh()
    (linear_2): Linear(in_features=50, out_features=50, bias=True)
    (tanh_2): Tanh()
    (linear_3): Linear(in_features=50, out_features=1, bias=True)
  )
)

In [2]:
import matplotlib.pyplot as plt
import numpy as np
import pdb
import sys
import torch
from torch_geometric.data import Data
from torch_geometric.data import Dataset, DataLoader
from random import sample 
import os
import networkx as nx
import scipy.io
from scipy.interpolate import griddata


sys.path.insert(0, 'C:/Users/puppyCookie/Downloads/graphGalerkin-main/graphGalerkin-main/pycamotk')
print(sys.path)
from pyCaMOtk.create_mesh_hsphere import mesh_hsphere
from pyCaMOtk.create_mesh_hcube import mesh_hcube 
from pyCaMOtk.setup_linelptc_sclr_base_handcode import setup_linelptc_sclr_base_handcode
from pyCaMOtk.create_dbc_strct import create_dbc_strct
from pyCaMOtk.create_femsp_cg import create_femsp_cg
from pyCaMOtk.solve_fem import solve_fem
from pyCaMOtk.visualize_fem import visualize_fem
from pyCaMOtk.lfcnsp import LocalFunctionSpace

sys.path.insert(0, 'C:/Users/puppyCookie/Downloads/graphGalerkin-main/graphGalerkin-main/source')
from FEM_ForwardModel import analyticalHeat3
from GCNNModel import e2vcg2connectivity,PossionNet
from TensorFEMCore_cg import Double,solve_fem_GCNN,create_fem_resjac
import setup_prob_eqn_handcode

torch.manual_seed(0)


  from .autonotebook import tqdm as notebook_tqdm


['C:/Users/puppyCookie/Downloads/graphGalerkin-main/graphGalerkin-main/pycamotk', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\python38.zip', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\DLLs', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\lib', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l', '', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\lib\\site-packages', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\lib\\site-packages\\win32', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\lib\\site-packages\\win32\\lib', 'c:\\Users\\puppyCookie\\anaconda3\\envs\\d2l\\lib\\site-packages\\Pythonwin']


<torch._C.Generator at 0x1684e750c10>

In [None]:
"""
Hyper prameters
"""
tol=1.0e-16
maxit=2000
# GCNN model
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model=PossionNet(nci=3, nco=1, kk=10).to(device)
model=model.double()
#加载模型
#model.load_state_dict(torch.load('C:/Users/puppyCookie/Downloads/graphGalerkin-main/graphGalerkin-main/demo0/model_heat_6x6.pth'))

cuda


: 