# Physics-Informed Neural Networks - Harmonic Oscillator

<a href="https://colab.research.google.com/github/stephenbaek/padl-yonsei/blob/master/labs/03_pinn_harmonic_oscillator.ipynb"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" title="Open and Execute in Google Colaboratory"></a>

In [None]:
import matplotlib.pyplot as plt
import os
from PIL import Image
import IPython.display

import torch
import torch.nn as nn

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
dtype = torch.float32

# Helper functions
def save_progress(save_dir, prefix, i):
    file = os.path.join(save_dir, prefix + "_%.6i.png"%(i+1))
    plt.savefig(file, bbox_inches='tight', pad_inches=0.1, dpi=100, facecolor="white")
    return file

def make_gif(imagefiles, output_path, fps=20):
    imgs = [Image.open(file) for file in imagefiles]
    imgs[0].save(fp=output_path, format='GIF', append_images=imgs[1:], save_all=True, duration=int(1000/fps), loop=True)

def reset_parameters(model):
    for layer in model.children():
        if hasattr(layer, 'reset_parameters'):
            layer.reset_parameters()


### Recap on (Damped) Harmonic Oscillators

In classical mechanics, a harmonic oscillator is a system in which a mass experiences a restoring force proportional to its displacement from equilibrium. If we let $x$ be the displacement of the mass $m$ from the equilibrium $x=0$, by the Hooke's law, the restoring force $F_r$ has the following form:

$$F_r=-kx.$$

Also, from the Newton's second law of motion $F=m\frac{\partial^2 x}{\partial t^2}$, where the second order derivative with respect to time $t$, we have the following relationship:

$$m\frac{\partial^2 x}{\partial t^2} + kx = 0.$$

In an ideal harmonic oscillator as in the above, the mass oscillates indefinitely with constant amplitude and frequency. However, real-world oscillators are often damped and experience a resistive or damping force that opposes its motion. This means that they gradually lose energy due to frictional or resistive forces, causing the oscillations to decrease in amplitude over time. Such a damping force, say $F_d$ can be modeled as being proportional to the velocity $\frac{\partial x}{\partial t}$ of the mass, or $F_d=-c\frac{\partial x}{\partial t}$, where $c$ is called the damping coefficient.

The Newton's second law for damped harmonic oscillators then becomes:

$$F_\text{total} = F_r + F_d = -kx-c\frac{\partial x}{\partial t} = m\frac{\partial^2 x}{\partial t^2},$$

which can be rewritten into a more intuitive form:

$$\frac{\partial^2 x}{\partial t^2} + 2\zeta\omega_0\frac{\partial x}{\partial t} + \omega_0^2 x = 0,$$

where $\omega_0:=\sqrt{\frac{k}{m}}$ is the angular frequency of the oscillator and $\zeta:=\frac{c}{2\sqrt{mk}}$ is called the damping ratio.

The value of the damping ratio can vary and critically determine the behavior of the system, affecting whether the system returns to equilibrium without oscillating (overdamping; $\zeta>1$), oscillates with a gradually reducing amplitude (underdamping; $\zeta<1$), or quickly comes to rest without oscillating (critical damping; $\zeta=1$).

### Implementation of the Ground Truth (Analytic Solution)

In the underdamped or critically damped cases where $\zeta \leq 1$, the analytical solution for the state space representation of a damped harmonic oscillator can be expressed as damped sinusoidal oscillations:

$$\mathbf{x}(t) = ae^{-\zeta\omega_0 t}\sin(\sqrt{1-\zeta^2}\omega_0t + \varphi),$$

where the amplitude $a$ and phase $\varphi$ are coefficients determined from the initial conditions.

In [None]:
def determineCoefficients(X0, zeta, omega0):
    ''' Determine amplitude a and phase phi.

    Inputs:
        X0: torch.tensor of size (2,) containing initial position and velocity.
        zeta: damping ratio.
        omega0: angular frequency.

    Returns:
        Amplitude a and phase phi that match the initial condition X0.
    '''
    assert zeta <= 1, "zeta must be under-/critically-damped (zeta <= 1)"

    if torch.abs((X0[1] + zeta*omega0*X0[0])) < 1e-7:
        phi = 0.5*torch.pi
    else:
        num = torch.sqrt(torch.tensor(1-zeta**2, dtype=dtype))*omega0*X0[0]
        den = X0[1] + zeta*omega0*X0[0]
        phi = torch.arctan(num/den)
    a = X0[0]/torch.sin(phi)

    return a, phi


# Initial condition
X0 = torch.tensor([1, 0], dtype=dtype)   # initial condition
omega0 = 2*torch.pi                      # angular frequency
zeta = 0.15                               # damping ratio (<= 1)

# Determine coefficients
a, phi = determineCoefficients(X0, zeta, omega0)

# Create ground truth data
Nt = 1000    # number of time steps
Tmax = 3
t = torch.linspace(0, Tmax, Nt, dtype=dtype)

# Compute the analytic solution (ground truth) for the state x(t)
ezot = torch.exp(-zeta*omega0*t)
sqrt_omega = torch.sqrt(torch.tensor(1-zeta**2, dtype=dtype))*omega0
position = a*ezot*torch.sin(sqrt_omega*t + phi)
GT = torch.unsqueeze(position,-1)

plt.figure(figsize=(6,3))
plt.plot(t, GT[:,0], color='black', label = "Ground Truth")
plt.grid()
plt.xlabel("t")
plt.ylabel("x")
plt.title('Position')
plt.legend()

### Sample training data

Now to train a model, let's sample training data. Here, we are going to assume a scenario where we only have a partial and discrete observation of the dynamics in time.

In [None]:
Tmax_sample = 1.2       # maximum time for training sample
sample_stride = 20       # stride between samples

Nt_sample = int(Nt*Tmax_sample/Tmax)

t_sample = torch.unsqueeze(t[0:Nt_sample:sample_stride], -1)
GT_sample = GT[0:Nt_sample:sample_stride,:]

training_data = torch.hstack((t_sample, GT_sample))  # t, x

plt.figure(figsize=(6,3))
plt.plot(t, GT[:,0], color='black', label = "Ground Truth")
plt.scatter(training_data[:,0], training_data[:,1], color='orange', label='Training Data')
plt.grid()
plt.xlabel("t")
plt.ylabel("x")
plt.title('Position')
plt.legend()

### Model Design

Now, let's design a model. Here, I'm providing a simple MLP implementation as a starter. But you are encouraged to experiment with different architectures.

In [None]:
class Backbone(nn.Module):
    def __init__(self, dtype=torch.float32):
        super().__init__()

        self.fc1 = nn.Linear(1, 32, dtype=dtype)  # input dim = 1 (t)
        self.fc2 = nn.Linear(32, 32, dtype=dtype)  # hidden dims = 32, 32
        self.fc3 = nn.Linear(32, 32, dtype=dtype)  #
        self.out = nn.Linear(32, 1, dtype=dtype)  # output dim = 1 (x)

        self.dtype = dtype

    def forward(self, x):
        x = self.fc1(x)
        x = nn.SiLU()(x)
        x = self.fc2(x)
        x = nn.SiLU()(x)
        x = self.fc3(x)
        x = nn.SiLU()(x)
        return self.out(x)
    
model = Backbone().to(device)

### Training WITHOUT Physics-Informed Loss Term

Before we get started with PINN, we'll set up a simple training session without any physics informed loss terms to set a baseline. Of course, a physics-naive model would'nt generalize well beyond the observed range of dynamics.

In [None]:
def plot_progress(t, GT, prediction, training_data, collocation_t=None):
    plt.figure(figsize=(6,3))
    plt.plot(t, GT, color='black', label = "Ground Truth")
    plt.plot(t, prediction, color='deepskyblue', label = "PINN")
    plt.scatter(training_data[:,0], training_data[:,1], color='orange', label='Training Data')
    if not collocation_t is None:
        plt.scatter(collocation_t, torch.zeros_like(collocation_t), color='olive', label = "Collocation Points", s=20)
    plt.grid()
    plt.xlabel("t")
    plt.ylabel("x")
    plt.title(f'Position (Iteration={iter+1})')
    plt.legend()

    return plt

In [None]:
reset_parameters(model)
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4)
files = []

save_dir = 'results/harmonic/nophysics'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)
    
MAX_ITER = 30000
input = training_data[:,:1].clone().detach().to(device)    # t
output = training_data[:,1:].clone().detach().to(device)  # x
t_inference = torch.unsqueeze(t,axis=-1).clone().to(device)
for iter in range(MAX_ITER):
    optimizer.zero_grad()
    prediction = model(input)
    loss = torch.mean((output-prediction)**2)

    loss.backward()
    optimizer.step()

    print(f"{iter+1}/{MAX_ITER} - loss: {loss.detach().cpu().numpy():.5e}", end='\r')
    
    # plot the result as training progresses
    if (iter+1) % 100 == 0: 
        
        prediction = model(t_inference).detach().cpu()
        plot_progress(t, GT, prediction, training_data)
        files.append(save_progress(save_dir, 'nophys', iter))
    
        if (iter+1) % 10000 == 0: plt.show()
        else: plt.close("all")

In [None]:
# Animate the training progress
make_gif(files, "results/harmonic/nophysics.gif")
IPython.display.Image(filename="results/harmonic/nophysics.gif") 

### Training WITH Physics-Informed Loss Term

Apparently, the baseline model without a physics loss term didn't generalize well to unseen time steps. Now, let's add a physics loss term to make it predict beyond training samples.

In [None]:
reset_parameters(model)
        
optimizer = torch.optim.Adam(model.parameters(),lr=5e-4)
files = []

save_dir = 'results/harmonic/pinn'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)
    
MAX_ITER = 30000
N_COLLOCATION_POINTS = 30
input = training_data[:,:1].clone().detach().to(device)    # t
output = training_data[:,1:].clone().detach().to(device)  # x
collocation_t = torch.linspace(0,Tmax,N_COLLOCATION_POINTS)
collocation_pts = torch.unsqueeze(collocation_t, -1).clone().detach().requires_grad_(True).to(device)
t_inference = torch.unsqueeze(t,axis=-1).clone().to(device)
for iter in range(MAX_ITER):
    optimizer.zero_grad()
    prediction = model(input)
    data_loss = torch.mean((output-prediction)**2)
    
    prediction_colloc = model(collocation_pts)
    dx  = torch.autograd.grad(prediction_colloc, collocation_pts, torch.ones_like(prediction_colloc), create_graph=True)[0]
    ddx  = torch.autograd.grad(dx, collocation_pts, torch.ones_like(dx), create_graph=True)[0]
    residual = ddx + 2*zeta*omega0*dx + (omega0**2)*prediction_colloc
    physics_loss = torch.mean(residual**2)

    loss = data_loss + (1e-4)*physics_loss

    loss.backward()
    optimizer.step()

    print(f"{iter+1}/{MAX_ITER} - loss: {loss.detach().cpu().numpy():.5e}, physics: {physics_loss.detach().cpu().numpy():.5e}", end='\r')
    
    # plot the result as training progresses
    if (iter+1) % 100 == 0: 
        
        prediction = model(t_inference).detach().cpu()

        plot_progress(t, GT, prediction, training_data, collocation_t)
        files.append(save_progress(save_dir, 'pinn', iter))
    
        if (iter+1) % 10000 == 0: plt.show()
        else: plt.close("all")

In [None]:
make_gif(files, "results/harmonic/pinn.gif")
IPython.display.Image(filename="results/harmonic/pinn.gif") 

### Data-Driven Discovery

Now, switching the gear a little bit, let's take a look at a different use of PINN, which is the discovery of physics. The previous example above was about finding the solution of a physics equation when the physics parameters were given. However, in the example we are going to see below, we assume that the physics parameters are unknown. For example, in real world, we may encounter a scenario where we make an observation of a physics phenomenon using sensors or imaging devices and want to figure out what physical parameters might govern the phenomena.

To emulate the scenario, we re-sample the training data as in the following:

In [None]:
Tmax_sample = 3.0        # include the entire interval for trainig
Nt_sample = int(Nt*Tmax_sample/Tmax)

t_sample = torch.unsqueeze(t[0:Nt_sample:sample_stride], -1)
GT_sample = GT[0:Nt_sample:sample_stride,:].clone()
GT_sample += 0.02*torch.randn(GT_sample.shape)     # add noise to emulate real-world

training_data = torch.hstack((t_sample, GT_sample)) 

plt.figure(figsize=(6,3))
plt.plot(t, GT[:,0], color='black', label = "Ground Truth")
plt.scatter(training_data[:,0], training_data[:,1], color='orange', label='Training Data')
plt.grid()
plt.xlabel("t")
plt.ylabel("x")
plt.title('Position')
plt.legend()

In [None]:
reset_parameters(model)

# Add zeta and omega0 to the list of parameters for optimization
# We will pretend that we do not know the true zeta and omega0.
omega_pred = torch.rand(1, device=device, requires_grad=True)   # GT: 2*pi = 6.283185307178
zeta_pred = torch.rand(1, device=device, requires_grad=True)    # GT: 0.15
optimizer = torch.optim.Adam(list(model.parameters()) + [zeta_pred, omega_pred],lr=5e-4)

# the rest of the process is pretty much the same as before...
files = []

save_dir = 'results/harmonic/pinn_discovery'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)
    
MAX_ITER = 20000
N_COLLOCATION_POINTS = 30
input = training_data[:,:1].clone().detach().to(device)   # t
output = training_data[:,1:].clone().detach().to(device)  # x
collocation_t = torch.linspace(0,Tmax,N_COLLOCATION_POINTS)
collocation_pts = torch.unsqueeze(collocation_t, -1).clone().detach().requires_grad_(True).to(device)
t_inference = torch.unsqueeze(t,axis=-1).to(device)
for iter in range(MAX_ITER):
    optimizer.zero_grad()
    prediction = model(input)
    data_loss = torch.mean((output-prediction)**2)
    
    prediction_colloc = model(collocation_pts)
    dx  = torch.autograd.grad(prediction_colloc, collocation_pts, torch.ones_like(prediction_colloc), create_graph=True)[0]
    ddx  = torch.autograd.grad(dx, collocation_pts, torch.ones_like(dx), create_graph=True)[0]
    residual = ddx + 2*zeta_pred*omega_pred*dx + (omega_pred**2)*prediction_colloc  # note that we are using the predicted value of zeta and omega
    physics_loss = torch.mean(residual**2)

    loss = data_loss + (1e-4)*physics_loss

    loss.backward()
    optimizer.step()

    print(f"{iter+1}/{MAX_ITER} - loss: {loss.detach().cpu().numpy():.5e}, physics: {physics_loss.detach().cpu().numpy():.5e}, zeta: {zeta_pred.detach().cpu().item():.3e}, omega0: {omega_pred.detach().cpu().item():.3e}", end='\r')
    
    # plot the result as training progresses
    if (iter+1) % 100 == 0: 
        
        prediction = model(t_inference).detach().cpu()

        plot_progress(t, GT, prediction, training_data, collocation_t)
        files.append(save_progress(save_dir, 'pinn', iter))
    
        if (iter+1) % 5000 == 0: plt.show()
        else: plt.close("all")

In [None]:
make_gif(files, "results/harmonic/pinn_discovery.gif")
IPython.display.Image(filename="results/harmonic/pinn_discovery.gif") 

In [None]:
print(f"Ground Truth: zeta={zeta}, omega0={omega0}")
print(f"Discovered: zeta={zeta_pred.detach().item()}, omega0={omega_pred.detach().item()}")