In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error


In [2]:
# Check for GPU Availability
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available.")
else:
    device = torch.device("cpu")
    print("GPU is not available; using CPU.")

GPU is available.


In [50]:
data = pd.read_csv('Wave_Current_ Speeds_TimeSeries(3.5m).csv', index_col= ['DateTime'], parse_dates=['DateTime'])


In [51]:
X = data[['Wave Speed']].values # Input: Wave Speed
y = data['Current speed'].values # Output: Current Speed


In [52]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)


In [53]:
# Convert the NumPy arrays to PyTorch tensors and move to GPU
X_train = torch.from_numpy(X_train).float().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_train = torch.from_numpy(y_train).float().to(device)
y_test = torch.from_numpy(y_test).float().to(device)

In [54]:
X_train = X_train.requires_grad_()


In [55]:
g = 9.81
f = 1.0e-4
h_x = 1.5
h_y = 1.5
dt = 0.01

In [56]:
# Define initial conditions for u and v at t = 0
def initial_conditions(x, y):
    u_initial = torch.sin(2 * np.pi * x) * torch.cos(np.pi * y)
    v_initial = -torch.cos(2 * np.pi * x) * torch.sin(np.pi * y)
    return u_initial, v_initial

# Define boundary conditions 
def boundary_conditions(x, y, t):
    u_bc = torch.zeros_like(x)  # Example: u = 0 at the boundaries
    v_bc = torch.zeros_like(x)  # Example: v = 0 at the boundaries
    return u_bc, v_bc


In [57]:
# Define spatial derivatives
def spatial_derivatives(u, x, y):
    u_x = torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u), create_graph=True, retain_graph=True)[0]
    u_y = torch.autograd.grad(u, y, grad_outputs=torch.ones_like(u), create_graph=True, retain_graph=True)[0]
    return u_x, u_y

In [58]:
# Implement the Runge-Kutta time integration
def runge_kutta(u, v, f, g, h_x, h_y, x, y, t, dt):
    # Compute k1, k2, k3, and k4 for both u and v
    k1_u, k1_v = pinn_physics(u, v, f, g, h_x, h_y, x, y, t)
    k2_u, k2_v = pinn_physics(u + 0.5 * dt * k1_u, v + 0.5 * dt * k1_v, f, g, h_x, h_y, x, y, t + 0.5 * dt)
    k3_u, k3_v = pinn_physics(u + 0.5 * dt * k2_u, v + 0.5 * dt * k2_v, f, g, h_x, h_y, x, y, t + 0.5 * dt)
    k4_u, k4_v = pinn_physics(u + dt * k3_u, v + dt * k3_v, f, g, h_x, h_y, x, y, t + dt)
    
    # Update u and v using the Runge-Kutta formula
    u_new = u + (dt / 6) * (k1_u + 2 * k2_u + 2 * k3_u + k4_u)
    v_new = v + (dt / 6) * (k1_v + 2 * k2_v + 2 * k3_v + k4_v)
    
    return u_new, v_new


In [59]:
# Define your physics-based loss
def pinn_physics(u, v, f, g, h_x, h_y, x, y, t):
    # Compute spatial derivatives
    u_x, u_y = spatial_derivatives(u, x, y)
    v_x, v_y = spatial_derivatives(v, x, y)
    
    # Define temporal derivatives
    dt = 0.01  # Define your desired time step
    u_t, v_t = runge_kutta(u, v, f, g, h_x, h_y, x, y, t, dt)
    
    # Define the physics equations
    physics_equations = [
        u_t + u * u_x + v * u_y + f * u - g * h_x,
        v_t + u * v_x + v * v_y - f * v - g * h_y
    ]
    
    return physics_equations


In [60]:
# Define the physics loss function
def physics_loss(u, v, f, g, h_x, h_y, x, y, t):
    predictions = torch.cat([u, v], dim=0)
    physics_equations = pinn_physics(u, v, f, g, h_x, h_y, x, y, t)
    data_loss = torch.mean(torch.abs(predictions - y))
    physics_loss_value = torch.mean(torch.abs(physics_equations))
    total_loss = data_loss + physics_loss_value
    return physics_loss_value, data_loss, total_loss


In [61]:
class CurrentSpeedPINN(torch.nn.Module):
    def __init__(self):
        super(CurrentSpeedPINN, self).__init__()
        self.dense1 = torch.nn.Linear(1, 32)  # First hidden layer with 32 neurons
        self.dense2 = torch.nn.Linear(32, 32)  # Second hidden layer with 32 neurons
        self.output_layer = torch.nn.Linear(32, 2)  # Output layer with two neurons for predicting u and v

    def forward(self, inputs):
        x = self.dense1(inputs)  # Pass input through the first hidden layer
        x = self.dense2(x)  # Pass through the second hidden layer
        output = self.output_layer(x)  # Output prediction
        return output

In [62]:
# Define training hyperparameters
learning_rate = 0.009  # Learning rate for optimization
epochs = 100  # Number of training epochs

model = CurrentSpeedPINN().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


In [64]:
# Training loop
losses = {'physics_loss': [], 'data_loss': [], 'total_loss': []}
t = 0.0  # Initial time

for epoch in range(epochs):
    optimizer.zero_grad()
    
    t += 0.01  # Increment time by the desired time step
    
    predictions = model(X_train)
    
    u_pred = predictions[:, 0]  # Extract u predictions
    v_pred = predictions[:, 0]  # Extract v predictions
    
    physics_loss_value, data_loss, total_loss = physics_loss(u_pred, v_pred, f, g, h_x, h_y, X_train, X_train, t)
    
    total_loss.backward()
    optimizer.step()
    
    losses['physics_loss'].append(physics_loss_value.item())
    losses['data_loss'].append(data_loss.item())
    losses['total_loss'].append(total_loss.item())
    
    if epoch % 10 == 0:
        print(f"Epoch [{epoch}/{epochs}] - Physics Loss: {physics_loss_value.item()}, Data Loss: {data_loss.item()}, Total Loss: {total_loss.item()}")

RecursionError: maximum recursion depth exceeded while calling a Python object

In [49]:
# Training Loop
losses = {'physics_loss': [], 'data_loss': [], 'total_loss': []}
t = 0.0  # Initial time

for epoch in range(epochs):
    optimizer.zero_grad()
    
    # Update time 't' at each epoch
    t += 1.0  # You should adjust the time increment according to your problem
    
    # Forward pass to get model predictions
    predictions = model(X_train)
    
    # Extract u and v predictions
    u_pred = predictions[:X_train.size(0)]
    v_pred = predictions[X_train.size(0):]
    
    # Calculate losses
    physics_loss_value, data_loss, total_loss = physics_loss(u_pred, v_pred, f, g, h_x, h_y, X_train, X_train, t)
    
    total_loss.backward()
    optimizer.step()
    
    # Append losses to the dictionary for analysis
    losses['physics_loss'].append(physics_loss_value.item())
    losses['data_loss'].append(data_loss.item())
    losses['total_loss'].append(total_loss.item())
    
    if epoch % 10 == 0:
        print(f"Epoch [{epoch}/{epochs}] - Physics Loss: {physics_loss_value.item()}, Data Loss: {data_loss.item()}, Total Loss: {total_loss.item()}")

TypeError: physics_loss() missing 1 required positional argument: 'dt'

In [None]:
# After training, evaluate the model on the test data
with torch.no_grad():
    y_pred = model(X_test)

    # Convert tensors to NumPy arrays for plotting
    y_test_numpy = y_test.cpu().numpy()
    y_pred_numpy = y_pred.cpu().numpy()

    mse = torch.mean(torch.square(y_test - y_pred)).item()
    mae = torch.mean(torch.abs(y_test - y_pred)).item()
    rmse = torch.sqrt(torch.mean(torch.square(y_test - y_pred)).item())

    print(f"Mean Squared Error: {mse}")
    print(f"Mean Absolute Error: {mae}")
    print(f"Root Mean Squared Error: {rmse}")


In [None]:
# Plot loss curves
plt.figure(figsize=(10, 5))
plt.plot(range(epochs), losses['physics_loss'], label='Physics Loss')
plt.plot(range(epochs), losses['data_loss'], label='Data Loss')
plt.plot(range(epochs), losses['total_loss'], label='Total Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()