In [11]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from scipy.integrate import solve_ivp
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from pathlib import Path 


In [12]:
# Physical Constants
G = 9.81  # Gravity (m/s^2)
BOUNCE_COEFF = 0.9  # Coefficient of restitution
MASS = 1.0  # Mass of the particle
RADIUS = 0.2  # Radius of the ball
DRAG_COEFF = 0.1  # Air resistance coefficient
box_size = 5.0 # Size of the box

# Simulation Parameters
n_balls = 100
output_dir = "BallDataset"
dt = 0.01  # Time step
seq_len = 160  # Input sequence length
pred_len = 40  # Prediction sequence length

# Model Parameters
LATENT_DIM = 16  # Latent space size
HIDDEN_DIM = 64  # Hidden layer size in dynamics
LIBRARY_DIM = 30  # Number of candidate functions for SINDy
INPUT_SIZE = (64, 64)  # Input image dimensions (H, W)

# Training Hyperparameters
NUM_EPOCHS = 100
BATCH_SIZE = 16
LEARNING_RATE = 1e-3
ALPHA, BETA, GAMMA, DELTA, EPSILON, ZETA = 1.0, 1.0, 1.0, 1e-4, 1.0, 1.0

In [13]:
def handle_collisions(state, radius, box_size, bounce_coeff):
    x, vx, y, vy = state
    if x - radius < -box_size:
        x = -box_size + radius
        vx = -bounce_coeff * vx
    elif x + radius > box_size:
        x = box_size - radius
        vx = -bounce_coeff * vx
    if y - radius < -box_size:
        y = -box_size + radius
        vy = -bounce_coeff * vy
    elif y + radius > box_size:
        y = box_size - radius
        vy = -bounce_coeff * vy
    return np.array([x, vx, y, vy])

In [14]:
def particle_dynamics_with_collisions(t, y, mass, radius, box_size, drag_coeff, bounce_coeff):
    x, vx, y_pos, vy = y
    dydt = np.zeros_like(y)
    dydt[0] = vx
    dydt[2] = vy
    speed = np.sqrt(vx**2 + vy**2)
    drag_force_x = -drag_coeff * speed * vx
    drag_force_y = -drag_coeff * speed * vy
    dydt[1] = drag_force_x / mass
    dydt[3] = -G + drag_force_y / mass
    return dydt

In [161]:
def generate_ball_dataset(n_balls, dt, seq_len, pred_len, box_size, radius=0.2, output_dir="BallDataset"):
    """
    Generate a dataset of ball trajectories and save them as GIFs.

    Args:
        n_balls: Number of trajectories to generate.
        dt: Time step for simulation.
        seq_len: Length of input sequences.
        pred_len: Length of prediction sequences.
        box_size: Size of the simulation box.
        radius: Radius of the ball.
        output_dir: Directory to save the dataset and generated GIFs.

    Returns:
        dataset: List of tuples containing trajectory data and corresponding GIF paths.
    """
    # Ensure the output directory path is absolute
    output_dir = os.path.abspath(output_dir)

    # Create the output directory
    os.makedirs(output_dir, exist_ok=True)

    dataset = []
    for i in tqdm(range(n_balls), desc="Generating dataset"):
        # Randomize parameters for the trajectory
        mass = np.random.uniform(1, 3)
        radius = mass/10
        drag_coeff = 0.1
        bounce_coeff = 0.9

        # Initial conditions for position and velocity
        init_pos = np.random.uniform(-box_size + radius, box_size - radius, 2)
        init_vel = np.random.uniform(-5.0, 5.0, 2)
        y0 = np.concatenate([init_pos, init_vel])

        # Time evaluation for the trajectory
        t_eval = np.linspace(0, (seq_len + pred_len) * dt, int(seq_len + pred_len))

        # Simulate the trajectory
        trajectory = []
        state = y0
        for t in t_eval:
            sol = solve_ivp(
                particle_dynamics_with_collisions,
                (t, t + dt),
                state,
                args=(mass, radius, box_size, drag_coeff, bounce_coeff),
                method="RK45",
                t_eval=[t + dt]
            )
            if sol.y.size == 0:
                break
            state = sol.y.flatten()
            state = handle_collisions(state, radius, box_size, bounce_coeff)
            trajectory.append(state)

        # Convert trajectory to numpy array
        trajectory = np.array(trajectory)

        # Path for the corresponding GIF
        gif_path = os.path.join(output_dir, f"ball_{i}.gif")

        # Generate and save the GIF
        generate_ball_gif(trajectory, radius, box_size, gif_path)

        # Append the trajectory and metadata to the dataset
        dataset.append((trajectory, gif_path))

    return dataset

In [162]:
def generate_ball_gif(trajectory, radius, box_size, save_path):
    """
    Generate and save a GIF of the ball trajectory.

    Args:
        trajectory: Numpy array containing the ball's positions and velocities.
        radius: Radius of the ball.
        box_size: Size of the box.
        save_path: File path to save the generated GIF.
    """
    x = trajectory[:, 0]
    y = trajectory[:, 2]

    fig, ax = plt.subplots(figsize=(6, 6))
    ax.set_xlim(-box_size, box_size)
    ax.set_ylim(-box_size, box_size)
    ax.set_aspect('equal')
    ball = plt.Circle((x[0], y[0]), radius, fc='blue')
    ax.add_patch(ball)

    def update(frame):
        ball.set_center((x[frame], y[frame]))
        return ball,

    ani = FuncAnimation(fig, update, frames=len(trajectory), blit=True, interval=50)
    ani.save(save_path, fps=20, writer='pillow')
    plt.close(fig)

In [15]:
# Function to load dataset from directory
def load_dataset_from_directory(directory):
    """
    Load a dataset of trajectories from the given directory.

    Args:
        directory: Path to the directory containing trajectory files (e.g., .npy files).

    Returns:
        trajectories: List of trajectories loaded from the directory.
    """
    trajectories = []
    for filename in os.listdir(directory):
        if filename.endswith(".npy"):  # Assuming the dataset is saved as .npy files
            file_path = os.path.join(directory, filename)
            trajectory = np.load(file_path)  # Load .npy file
            trajectories.append(trajectory)  # Append to the list of trajectories
    return trajectories

In [16]:
# BallTrajectoryDataset class
class BallTrajectoryDataset(Dataset):
    def __init__(self, trajectories, seq_len, pred_len):
        """
        Dataset class for ball trajectories.

        Args:
            trajectories: List of trajectories (numpy arrays).
            seq_len: Length of input sequence.
            pred_len: Length of prediction sequence.
        """
        self.trajectories = trajectories
        self.seq_len = seq_len
        self.pred_len = pred_len

    def __len__(self):
        """
        Return the number of trajectories in the dataset.
        """
        return len(self.trajectories)

    def __getitem__(self, idx):
        """
        Retrieve the input and target sequences for the given index.

        Args:
            idx: Index of the trajectory.

        Returns:
            input_seq: Input sequence (seq_len, state_dim).
            target_seq: Target sequence (pred_len, state_dim).
        """
        trajectory = self.trajectories[idx]  # Get the trajectory at the specified index
        input_seq = trajectory[:self.seq_len]  # Slice the input sequence
        target_seq = trajectory[self.seq_len:self.seq_len + self.pred_len]  # Slice the target sequence
        return torch.tensor(input_seq, dtype=torch.float32), torch.tensor(target_seq, dtype=torch.float32)

In [17]:
# Dataset splitting function
def split_dataset(trajectories, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1):
    """
    Split dataset into train, validation, and test sets.

    Args:
        trajectories: List of trajectories (numpy arrays).
        train_ratio: Proportion of data for training.
        val_ratio: Proportion of data for validation.
        test_ratio: Proportion of data for testing.

    Returns:
        train_set: Training dataset.
        val_set: Validation dataset.
        test_set: Testing dataset.
    """
    total = train_ratio + val_ratio + test_ratio
    train_ratio /= total
    val_ratio /= total
    test_ratio /= total

    train_val, test_set = train_test_split(trajectories, test_size=test_ratio, random_state=42)
    train_set, val_set = train_test_split(train_val, test_size=val_ratio / (train_ratio + val_ratio), random_state=42)
    return train_set, val_set, test_set

In [18]:
# Load the dataset from the saved directory
dataset_path = r"C:\Users\User\Documents\GitHub\PIML\Complete Video Models\Ball Trajectory\BallDataset"

# Load trajectories
trajectories = load_dataset_from_directory(dataset_path)
print(f"Loaded {len(trajectories)} trajectories from {dataset_path}")

Loaded 0 trajectories from C:\Users\User\Documents\GitHub\PIML\Complete Video Models\Ball Trajectory\BallDataset


In [20]:
import os

# Check if the directory exists
if not os.path.exists(dataset_path):
    print(f"Error: Directory {dataset_path} does not exist.")
else:
    print(f"Directory {dataset_path} exists.")


Directory C:\Users\User\Documents\GitHub\PIML\Complete Video Models\Ball Trajectory\BallDataset exists.


In [21]:
# List all files in the directory
files_in_directory = os.listdir(dataset_path)
print(f"Files in {dataset_path}: {files_in_directory}")

Files in C:\Users\User\Documents\GitHub\PIML\Complete Video Models\Ball Trajectory\BallDataset: ['ball_0.gif', 'ball_1.gif', 'ball_10.gif', 'ball_100.gif', 'ball_101.gif', 'ball_102.gif', 'ball_103.gif', 'ball_104.gif', 'ball_105.gif', 'ball_106.gif', 'ball_107.gif', 'ball_108.gif', 'ball_109.gif', 'ball_11.gif', 'ball_110.gif', 'ball_111.gif', 'ball_112.gif', 'ball_113.gif', 'ball_114.gif', 'ball_115.gif', 'ball_116.gif', 'ball_117.gif', 'ball_118.gif', 'ball_119.gif', 'ball_12.gif', 'ball_120.gif', 'ball_121.gif', 'ball_122.gif', 'ball_123.gif', 'ball_124.gif', 'ball_125.gif', 'ball_126.gif', 'ball_127.gif', 'ball_128.gif', 'ball_129.gif', 'ball_13.gif', 'ball_130.gif', 'ball_131.gif', 'ball_132.gif', 'ball_133.gif', 'ball_134.gif', 'ball_135.gif', 'ball_136.gif', 'ball_137.gif', 'ball_138.gif', 'ball_139.gif', 'ball_14.gif', 'ball_140.gif', 'ball_141.gif', 'ball_142.gif', 'ball_143.gif', 'ball_144.gif', 'ball_145.gif', 'ball_146.gif', 'ball_147.gif', 'ball_148.gif', 'ball_149.gif', 

In [19]:
# Split dataset into train, validation, and test sets
train_set, val_set, test_set = split_dataset(trajectories)

# Initialize datasets for each split
train_dataset = BallTrajectoryDataset(train_set, seq_len=SEQ_LEN, pred_len=PRED_LEN)
val_dataset = BallTrajectoryDataset(val_set, seq_len=SEQ_LEN, pred_len=PRED_LEN)
test_dataset = BallTrajectoryDataset(test_set, seq_len=SEQ_LEN, pred_len=PRED_LEN)

# Create DataLoader objects for each split
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

ValueError: With n_samples=0, test_size=0.10000000000000002 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [None]:
# Inspect a batch to verify
for batch_inputs, batch_targets in train_loader:
    print(f"Input batch shape: {batch_inputs.shape}")
    print(f"Target batch shape: {batch_targets.shape}")
    break

In [None]:
class SINDyLayer(nn.Module):
    def __init__(self, input_dim, library_dim):
        super(SINDyLayer, self).__init__()
        self.coefficients = nn.Parameter(torch.randn(library_dim, input_dim))

    def forward(self, library, dz_dt):
        dz_dt_pred = library @ self.coefficients
        sindy_loss = torch.mean((dz_dt - dz_dt_pred) ** 2)
        return dz_dt_pred, sindy_loss

In [None]:
class PINNModel(nn.Module):
    def __init__(self, input_size, latent_dim, hidden_dim, library_dim):
        super(PINNModel, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(32 * (input_size[0] // 4) * (input_size[1] // 4), latent_dim),
        )
        self.dynamics = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, latent_dim),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 32 * (input_size[0] // 4) * (input_size[1] // 4)),
            nn.ReLU(),
            nn.Unflatten(1, (32, input_size[0] // 4, input_size[1] // 4)),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid(),
        )
        self.sindy_layer = SINDyLayer(latent_dim, library_dim)

    def forward(self, x):
        z = self.encoder(x)
        dz_dt = self.dynamics(z)
        x_reconstructed = self.decoder(z)
        return x_reconstructed, z, dz_dt

In [None]:
def reconstruction_loss(u_pred, u_true):
    """
    Compute the reconstruction loss.

    Args:
        u_pred: Predicted states (batch_size, seq_len, state_dim).
        u_true: Ground truth states (batch_size, seq_len, state_dim).

    Returns:
        loss: Mean squared error loss.
    """
    return torch.mean((u_pred - u_true) ** 2)

In [None]:
def sindy_loss(z, dz_dt, sindy_layer):
    """
    Compute the SINDy loss for sparse dynamics.

    Args:
        z: Latent variables (batch_size, latent_dim).
        dz_dt: Time derivatives of latent variables (batch_size, latent_dim).
        sindy_layer: SINDy dynamics layer.

    Returns:
        loss: SINDy loss value.
    """
    library = sindy_library(z)
    dz_dt_pred, loss = sindy_layer(library, dz_dt)
    return loss

In [None]:
def sparsity_loss(sindy_layer):
    """
    Compute the sparsity loss for SINDy coefficients.

    Args:
        sindy_layer: SINDy dynamics layer.

    Returns:
        loss: L1 norm of the SINDy coefficients.
    """
    return torch.sum(torch.abs(sindy_layer.coefficients))

In [None]:
def compute_energy(state, mass, G):
    """
    Compute total energy (kinetic + potential).

    Args:
        state: Tensor of shape (batch_size, state_dim).
        mass: Mass of the particle.
        G: Gravitational acceleration.

    Returns:
        energy: Tensor of total energy for each batch.
    """
    _, vx, y, vy = state.T

    # Kinetic energy
    kinetic_energy = 0.5 * mass * (vx**2 + vy**2)

    # Potential energy
    potential_energy = mass * G * y

    return kinetic_energy + potential_energy

In [None]:
def energy_loss(u_pred, u_true, mass, G):
    """
    Compute the energy loss between predicted and true states.

    Args:
        u_pred: Predicted states (batch_size, seq_len, state_dim).
        u_true: True states (batch_size, seq_len, state_dim).
        mass: Mass of the particle.
        G: Gravitational acceleration.

    Returns:
        loss: Mean squared error of energy differences.
    """
    predicted_energy = compute_energy(u_pred, mass, G)
    true_energy = compute_energy(u_true, mass, G)
    return torch.mean((predicted_energy - true_energy) ** 2)

In [None]:
def ode_loss(predicted_state, true_state, mass, drag_coeff, G):
    """
    Compute the ODE residual loss.

    Args:
        predicted_state: Predicted states (batch_size, state_dim).
        true_state: True states (batch_size, state_dim).
        mass: Mass of the particle.
        drag_coeff: Drag coefficient.
        G: Gravitational acceleration.

    Returns:
        loss: Residual loss enforcing the governing equations.
    """
    x, vx, y, vy = predicted_state.T
    speed = torch.sqrt(vx**2 + vy**2)
    drag_force_x = -drag_coeff * speed * vx
    drag_force_y = -drag_coeff * speed * vy
    dx_dt = vx
    dy_dt = vy
    dvx_dt = drag_force_x / mass
    dvy_dt = -G + drag_force_y / mass
    dx_dt_pred, dvx_dt_pred, dy_dt_pred, dvy_dt_pred = true_state.T
    return torch.mean((dx_dt - dx_dt_pred)**2 + (dvx_dt - dvx_dt_pred)**2 +
                      (dy_dt - dy_dt_pred)**2 + (dvy_dt - dvy_dt_pred)**2)

In [None]:
def collision_loss(predicted_state, true_state, radius, box_size, bounce_coeff):
    """
    Compute the collision loss based on boundary interactions.

    Args:
        predicted_state: Predicted states (batch_size, state_dim).
        true_state: True states (batch_size, state_dim).
        radius: Radius of the particle.
        box_size: Size of the box.
        bounce_coeff: Coefficient of restitution for collisions.

    Returns:
        loss: Collision loss value.
    """
    x, vx, y, vy = predicted_state.T
    x_collision = (x - radius < -box_size) | (x + radius > box_size)
    y_collision = (y - radius < -box_size) | (y + radius > box_size)
    vx_true, vy_true = true_state[:, 1], true_state[:, 3]
    loss = torch.mean(x_collision * (vx + bounce_coeff * vx_true)**2) + \
           torch.mean(y_collision * (vy + bounce_coeff * vy_true)**2)
    return loss

In [None]:
def full_combined_loss(u_pred, u_true, z, dz_dt, sindy_layer, mass, drag_coeff, G, radius, box_size, bounce_coeff,
                       alpha=1.0, beta=1.0, gamma=1.0, delta=1.0, epsilon=1.0, zeta=1.0):
    """
    Compute the full combined loss.

    Args:
        u_pred: Predicted states.
        u_true: True states.
        z: Latent variables.
        dz_dt: Time derivatives of latent variables.
        sindy_layer: SINDy dynamics layer.
        mass, drag_coeff, G: Physical parameters.
        radius, box_size, bounce_coeff: Collision parameters.
        alpha, beta, gamma, delta, epsilon, zeta: Weights for loss components.

    Returns:
        total_loss: Combined loss value.
    """
    # Compute individual losses
    reconstruction = reconstruction_loss(u_pred, u_true)
    sindy = sindy_loss(z, dz_dt, sindy_layer)
    sparsity = sparsity_loss(sindy_layer)
    ode = ode_loss(u_pred, u_true, mass, drag_coeff, G)
    collision = collision_loss(u_pred, u_true, radius, box_size, bounce_coeff)
    energy = energy_loss(u_pred, u_true, mass, G)

    # Total combined loss
    return (alpha * reconstruction +
            beta * sindy +
            gamma * sparsity +
            delta * ode +
            epsilon * collision +
            zeta * energy)

In [None]:
def train_model_with_validation(model, sindy_layer, train_loader, val_loader, optimizer, epochs, mass, drag_coeff, G,
                                radius, box_size, bounce_coeff, alpha, beta, gamma, delta, epsilon, zeta):
    """
    Train the PINN + SINDy model with validation and print accuracy.

    Args:
        model: PINNModel instance.
        sindy_layer: SINDy dynamics layer.
        train_loader: DataLoader for training data.
        val_loader: DataLoader for validation data.
        optimizer: Optimizer for training.
        epochs: Number of training epochs.
        mass, drag_coeff, G: Physical parameters.
        radius, box_size, bounce_coeff: Collision parameters.
        alpha, beta, gamma, delta, epsilon, zeta: Weights for different loss components.
    """
    model.train()
    sindy_layer.train()

    for epoch in range(epochs):
        train_loss = 0.0
        train_accuracy = 0.0
        val_loss = 0.0
        val_accuracy = 0.0

        # Training Phase
        for inputs, targets in train_loader:
            optimizer.zero_grad()

            # Forward pass
            u_pred, z, dz_dt = model(inputs)

            # Compute loss
            loss = full_combined_loss(u_pred, targets, z, dz_dt, sindy_layer, mass, drag_coeff, G,
                                      radius, box_size, bounce_coeff, alpha, beta, gamma, delta, epsilon, zeta)

            # Backpropagation
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

            # Compute accuracy
            batch_accuracy = 1 - (torch.norm(u_pred - targets) / torch.norm(targets))
            train_accuracy += batch_accuracy.item()

        # Validation Phase
        model.eval()
        with torch.no_grad():
            for inputs, targets in val_loader:
                # Forward pass
                u_pred, z, dz_dt = model(inputs)

                # Compute loss
                loss = full_combined_loss(u_pred, targets, z, dz_dt, sindy_layer, mass, drag_coeff, G,
                                          radius, box_size, bounce_coeff, alpha, beta, gamma, delta, epsilon, zeta)
                val_loss += loss.item()

                # Compute accuracy
                batch_accuracy = 1 - (torch.norm(u_pred - targets) / torch.norm(targets))
                val_accuracy += batch_accuracy.item()

        # Print losses and accuracies for the epoch
        print(f"Epoch {epoch + 1}/{epochs}, "
              f"Train Loss: {train_loss / len(train_loader):.4f}, "
              f"Train Accuracy: {train_accuracy / len(train_loader):.4f}, "
              f"Validation Loss: {val_loss / len(val_loader):.4f}, "
              f"Validation Accuracy: {val_accuracy / len(val_loader):.4f}")

        model.train()

In [None]:
def evaluate_model_on_test(model, test_loader, mass, drag_coeff, G, radius, box_size, bounce_coeff,
                           alpha, beta, gamma, delta, epsilon, zeta):
    """
    Evaluate the model on the test set and compute accuracy.

    Args:
        model: Trained PINNModel instance.
        test_loader: DataLoader for test data.
        mass, drag_coeff, G: Physical parameters.
        radius, box_size, bounce_coeff: Collision parameters.
        alpha, beta, gamma, delta, epsilon, zeta: Weights for loss components.

    Returns:
        test_loss: Average test loss.
        test_accuracy: Average test accuracy.
    """
    model.eval()
    test_loss = 0.0
    test_accuracy = 0.0
    with torch.no_grad():
        for inputs, targets in test_loader:
            # Forward pass
            u_pred, z, dz_dt = model(inputs)

            # Compute loss
            loss = full_combined_loss(u_pred, targets, z, dz_dt, model.sindy_layer, mass, drag_coeff, G,
                                      radius, box_size, bounce_coeff, alpha, beta, gamma, delta, epsilon, zeta)
            test_loss += loss.item()

            # Compute accuracy
            batch_accuracy = 1 - (torch.norm(u_pred - targets) / torch.norm(targets))
            test_accuracy += batch_accuracy.item()

    avg_test_loss = test_loss / len(test_loader)
    avg_test_accuracy = test_accuracy / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {avg_test_accuracy:.4f}")
    return avg_test_loss, avg_test_accuracy

In [None]:
# Initialize model and optimizer
model = PINNModel(input_size=INPUT_SIZE, latent_dim=LATENT_DIM, hidden_dim=HIDDEN_DIM, library_dim=LIBRARY_DIM)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Train the model with validation
train_model_with_validation(model, model.sindy_layer, train_loader, val_loader, optimizer, NUM_EPOCHS, MASS,
                            DRAG_COEFF, G, RADIUS, BOX_SIZE, BOUNCE_COEFF, ALPHA, BETA, GAMMA, DELTA, EPSILON, ZETA)

# Evaluate the model on the test set
test_loss, test_accuracy = evaluate_model_on_test(model, test_loader, MASS, DRAG_COEFF, G, RADIUS, BOX_SIZE, BOUNCE_COEFF,
                                                  ALPHA, BETA, GAMMA, DELTA, EPSILON, ZETA)