In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.autograd import grad, Variable

In [None]:
#Lets Create a Simple NN (MLP, 3 layers, RELU)
class SimpleNN(nn.Sequential):
    """SimpleNN.

    Args:
        input_size  (int): Input size.
        hidden_size (int): Hidden size.
        output_size (int): Output size.
    """
    def __init__(self, input_size, hidden_size, output_size):
        pass

In [None]:
# Lets Define the Physics Loss
# We need to store one paramer (velocity)
# The formula is df/dt=v*df/dx
class SinPhyLoss(nn.Module):
    """SinPhy loss.

    Args:
        velocity (float): velocity.
    """

    def __init__(self, velocity):
        super().__init__()
        self.velocity = velocity

    def forward(self, pY, t, x, weight=None, **kwargs):
        """Forward Function.

        Args:
            pY (Tensor): of shape (N, C, H, W). Predicted tensor.
            x (Tensor): of shape (N, C, H, W).  Input truth tensor.
            weight (Tensor, optional): of shape (N, C, H, W). Element-wise
                weights. Default: None.
        """
        pass

In [None]:
#Lets new define a train step
def train_step(fx, train_data, train_phys, train_bound,
               loss_phys_func,
               data_coef=1, phys_coef=1, bound_coef=1):
    """Train step.

    Args:
        fx (nn.Module): Network.
        train_data (Tensor):  of shape (N, 3, H, W). Train data (regression) with t, x and f.
        train_phys (Tensor):  of shape (M, 3, H, W). Train phys data with t and x.
        train_bound (Tensor): of shape (L, 3, H, W). Train bound data points with t, x and f.
        loss_phys_func (nn.Module): Physics loss.
        data_coef (float):  Weight of the regression term in the loss. Default 1.
        phys_coef (float):  Weight of the physiscs term in the loss. Default 1.
        bound_coef (float): Weight of the bound conditions term in the loss. Default 1.
    """
    pass

In [None]:
#Util for batch sampling
def batch_sample(data, batch_size):
    indices = torch.randint(low=0, high=data.shape[0], size=(batch_size,))

    return data[indices]

def domain_sampling(batch_size, T, L):
    t = torch.rand(size=(batch_size,))*T
    x = torch.rand(size=(batch_size,))*L

    return torch.stack((t, x), dim=1)


In [None]:
def gen_data(L=1.0, T=2.0, v=0.1, Nx=101, Nt=1000):
    # Parameters
    L = 1.0  # Length of the string
    T = 2.0  # Total time of simulation
    v = 0.1  # Wave speed
    Nx = 101  # Number of spatial points
    Nt = 1000  # Number of time steps

    # Derived parameters
    dx = L / (Nx - 1)  # Spatial step size
    dt = T / Nt  # Time step size
    c = v * dt / dx  # Courant number

    if c > 1:
        raise ValueError("The Courant number must be <= 1 for stability.")

    # Spatial and temporal grids
    x = np.linspace(0, L, Nx)
    t = np.linspace(0, T, Nt)

    # Initial conditions
    f = np.zeros((Nt, Nx))  # Initialize the solution array

    # Boundary conditions (fixed ends)
    f[:, 0] = 0
    f[:, -1] = 0

    # Initial condition at t = 0
    # OPTION A: Considering the string is already vibrating
    f[0, :] = np.sin(10 * np.pi * x)

    # OPTION B: Considering we just hit the string with our thumb in the center (Gaussian model at a point point)
    # mu = L/2
    # sigma = L/20
    # f[0, :] = np.exp(-(x - mu)**2/(2 * sigma**2))

    # Assume zero initial velocity to start the numerical propagation
    f[1, :] = f[0, :]


    # Finite difference scheme
    for n in range(1, Nt - 1):
        f[n + 1, 1:-1] = (
            2 * f[n, 1:-1]
            - f[n - 1, 1:-1]
            + c**2 * (f[n, 2:] - 2 * f[n, 1:-1] + f[n, :-2])
        )

    X = np.zeros((t.shape[0], x.shape[0], 2), np.float32)
    y = np.zeros((t.shape[0], x.shape[0]), np.float32)
    boundary = np.zeros((t.shape[0]*2, 3), np.float32)

    for i, ti in enumerate(t):
        for j, xj in enumerate(x):
            X[i, j, 0] = ti
            X[i, j, 1] = xj
            y[i, j] = f[i, j]

    boundary[:t.shape[0], 0] = t
    boundary[t.shape[0]:, 0] = t
    boundary[:t.shape[0], 1] = x[0]
    boundary[t.shape[0]:, 1] = x[-1]

    X = X.reshape((t.shape[0]*x.shape[0], 2))
    y = y.reshape((t.shape[0]*x.shape[0], 1))

    return X, y, boundary

In [None]:
# Visualization
def animate_wave(f, tx, T=2.0, Nt=1000, interval=200):
    dt = T / Nt

    #plt.figure(figsize=(8, 4))
    for n in range(0, Nt, interval):
        t = n * dt
        idx_t = np.abs(tx[:, 0]-t) < dt

        x_t = tx[idx_t, 1]
        f_t = f[idx_t, 0]

        #plt.clf()
        plt.plot(x_t, f_t, label=f"Time: {n * dt:.2f} s")
        #plt.pause(0.01)
    plt.ylim(-1.2, 1.2)
    plt.xlabel("Position (x)")
    plt.ylabel("Displacement (f)")
    plt.title("1D Wave Equation Simulation")
    #plt.legend()
    plt.show()


In [None]:
#Some common paramerters
RS = 1    #Random Seed
L = 1.0   # Length of the string
T = 2.0   # Total time of simulation
v = 0.1   # Wave speed
Nx = 101  # Number of spatial points
Nt = 1000 # Number of time steps
N  = int(1e5)  # Number of training iterations
BS = 256  # Batch size
PN = 100  # Print results every this number

#Set Random seed for numpy and torch
np.random.seed(RS)
g=torch.manual_seed(RS)

In [None]:
#First, generate data and boundary

#Lets get some random points for training
NTrain = int(0.5*tx.shape[0]) # We are going to use 50% of points for training in this example

#Shuffle!
ind_train = np.arange(0,tx.shape[0],1,dtype=np.int32)
np.random.shuffle(ind_train)
ind_train = ind_train[:NTrain]

#Create tensors
train_data = torch.from_numpy(np.concatenate([tx[ind_train], f[ind_train]], axis=1))
train_boundary = torch.from_numpy(boundary)

#Second, let's create the physics loss
loss_phys_func = SinPhyLoss(velocity=v)

#Third, network
network = SimpleNN(input_size=2, hidden_size=512, output_size=1)

#Finally, Adam
optimizer = Adam(network.parameters(), lr=0.001)

In [None]:
loss_m = 0

#Train loop
for i in range(N):
    #Pick a random batch of data and boundaries
    train_data_i = Variable(batch_sample(train_data, BS))
    train_boundary_i = Variable(batch_sample(train_boundary, BS))

    #Domain points
    train_phys_i = Variable(domain_sampling(BS, T, L))

    #Train step
    optimizer.zero_grad()
    losses = train_step(network,
                        train_data_i, train_phys_i, train_boundary_i,
                        loss_phys_func)
    loss_m += losses['loss'].item()
    ld = losses['loss_data']
    lb = losses['loss_bound']
    lp = losses['loss_phys']
    losses['loss'].backward()
    n = i+1
    optimizer.step()
    if i % PN == 0:
        print(f'Iter {i}, Loss: {loss_m/float(n)}, Loss data: {ld}, Loss boundary: {lb}, Loss phys: {lp}')

Iter 0, Loss: 0.17664356529712677,                 Loss data: 0.12789037823677063,                 Loss boundary: 0.0023139941040426493,                 Loss phys: 0.04643918573856354
Iter 100, Loss: 0.16609232127666473,                 Loss data: 0.12222839891910553,                 Loss boundary: 0.00304559082724154,                 Loss phys: 0.040818337351083755
Iter 200, Loss: 0.17905452847480774,                 Loss data: 0.1452096700668335,                 Loss boundary: 0.0030615648720413446,                 Loss phys: 0.03078330121934414
Iter 300, Loss: 0.14920419454574585,                 Loss data: 0.11549407243728638,                 Loss boundary: 0.0034802884329110384,                 Loss phys: 0.030229832977056503
Iter 400, Loss: 0.15845757722854614,                 Loss data: 0.12414795160293579,                 Loss boundary: 0.007842435501515865,                 Loss phys: 0.02646719478070736
Iter 500, Loss: 0.1492358148097992,                 Loss data: 0.116565868

KeyboardInterrupt: 