### 使用DEEPONET训练
基于物理信息的求解方法
$$ 
\frac{\partial s}{\partial t} = D \frac{\partial^2 s}{\partial x^2} + k s^2 + u(x)
$$

1.随机取样U(X)函数，输入枝干

2.选取观察点，输入主干网络

3.利用pytorch的自动微分计算二阶导数，一阶导数，从而计算方程形式上的残差。

In [1]:
import torch
import torch.nn as nn
import torch.autograd as autograd

class BranchNet(nn.Module):
    def __init__(self, input_dim=10):  # 默认输入维度为10
        super(BranchNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 50),  # 输入维度是 num_samples
            nn.ReLU(),
            nn.Linear(50, 100),
            nn.ReLU(),
            nn.Linear(100, 50),  # 输出特征维度
        )
    def forward(self, u):
        return self.fc(u)

class TrunkNet(nn.Module):
    def __init__(self, input_dim=2):  # 默认输入维度为2
        super(TrunkNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 50),  # 输入维度是 2 (x, t)
            nn.ReLU(),
            nn.Linear(50, 100),
            nn.ReLU(),
            nn.Linear(100, 50),  # 输出特征维度
        )
    def forward(self, coords):
        # coords 的形状是 [batch_size, num_coords, 2]
        return self.fc(coords)
class DeepONet(nn.Module):
    def __init__(self, branch_input_dim=10, trunk_input_dim=2):
        super(DeepONet, self).__init__()
        self.branch = BranchNet(input_dim=branch_input_dim)
        self.trunk = TrunkNet(input_dim=trunk_input_dim)
    
    def forward(self, u, coords):
        # BranchNet 的输出 [batch_size, 50]
        branch_out = self.branch(u)
        # TrunkNet 的输出 [batch_size, num_coords, 50]
        trunk_out = self.trunk(coords)
        # 将 branch_out 添加一个维度 [batch_size, 1, 50]
        branch_out = branch_out.unsqueeze(1)
        # 点积合并
        return torch.sum(branch_out * trunk_out, dim=-1)  # 输出形状 [batch_size, num_coords]


In [2]:
# 初始化网络
model = DeepONet(branch_input_dim=10, trunk_input_dim=2)

# 随机生成输入数据
u_samples = torch.rand(32, 10)  # [batch_size, num_samples]
coords = torch.rand(32, 50, 2)  # [batch_size, num_coords, 2]

# 前向传播
output = model(u_samples, coords)
print(output.shape)  # 预期输出形状为 [32, 50]


torch.Size([32, 50])


In [3]:
def physics_loss(model, u_samples, coords, D, k):
    """
    model: DeepONet 模型
    u_samples: 输入函数 u(x) 的采样值 [batch_size, num_samples]
    coords: 空间和时间坐标 [batch_size, num_coords, 2] (x, t)
    D: 扩散系数
    k: 反应速率
    """
    # Ensure tensors require gradients
    coords.requires_grad = True
    u_samples.requires_grad = True

    # Predict the solution s(x, t)
    s_pred = model(u_samples, coords)  # [batch_size, num_coords]

    # Compute ∂s/∂t
    ds_dt = autograd.grad(
        outputs=s_pred, inputs=coords, grad_outputs=torch.ones_like(s_pred),
        create_graph=True, retain_graph=True
    )[0][..., 1]  # Extract gradient w.r.t. time

    # Compute ∂^2s/∂x^2
    ds_dx = autograd.grad(
        outputs=s_pred, inputs=coords, grad_outputs=torch.ones_like(s_pred),
        create_graph=True, retain_graph=True
    )[0][..., 0]  # Extract gradient w.r.t. space

    d2s_dx2 = autograd.grad(
        outputs=ds_dx, inputs=coords, grad_outputs=torch.ones_like(ds_dx),
        create_graph=True, retain_graph=True
    )[0][..., 0]  # Second derivative w.r.t. space

    # Compute residual: ∂s/∂t - D * ∂^2s/∂x^2 - k * s^2 - u(x)
    residual = ds_dt - D * d2s_dx2 - k * s_pred**2 - u_samples.mean(dim=-1).unsqueeze(1)

    # Loss is the mean of squared residuals
    return torch.mean(residual**2)


In [4]:
# Parameters
D = 0.01  # Diffusion coefficient
k = 0.01  # Reaction rate
batch_size = 32
num_samples = 10
num_coords = 50

# Generate random inputs
u_samples = torch.rand(batch_size, num_samples)  # Input samples
coords = torch.rand(batch_size, num_coords, 2)  # Spatial and temporal coordinates

# Ensure requires_grad is enabled
coords.requires_grad = True
u_samples.requires_grad = True

# Define model
model = DeepONet()

# Compute physics loss
loss = physics_loss(model, u_samples, coords, D, k)

print("Physics-informed loss:", loss.item())


Physics-informed loss: 0.24676446616649628


In [5]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(1000):
    optimizer.zero_grad()
    loss = physics_loss(model, u_samples, coords, D, k)
    loss.backward()
    optimizer.step()

    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")


Epoch 0, Loss: 0.24676446616649628
Epoch 100, Loss: 0.0004936123150400817
Epoch 200, Loss: 0.00022589806758332998
Epoch 300, Loss: 0.00011837792408186942
Epoch 400, Loss: 4.742137753055431e-05
Epoch 500, Loss: 2.6898076612269506e-05
Epoch 600, Loss: 1.699847780400887e-05
Epoch 700, Loss: 1.2978903214388993e-05
Epoch 800, Loss: 1.5198611436062492e-05
Epoch 900, Loss: 8.042125045903958e-06
