In [None]:
# Import necessary libraries
import torch
import numpy as np

# Setting up the device for PyTorch
# CUDA will be used if available, otherwise CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Loading the data from a .npz file
data = np.load('5000_pos_max_norm.npz')

# Extracting input and output segments from the loaded data
input_segments = data['input_segments']
output_segments = data['output_segments']

# Reshaping the input and output segments for compatibility with the model
# Transposing the dimensions to match expected input format for GRU
input_segments_reshaped = np.transpose(input_segments, (2, 1, 0))
output_segments_reshaped = np.transpose(output_segments, (2, 1, 0))

In [None]:
from sklearn.model_selection import train_test_split

def split_dataset_and_get_stats(input_data, target_data, TRAIN_SPLIT=0.8, VAL_SPLIT=0.5):
    '''
    Split the dataset into training, validation, and testing sets.
    The function takes in input and target data, and the split ratios.
    It returns a dictionary containing the split datasets.
    '''
    # Splitting the data into training and other sets (validation + test)
    train_data_in, other_data_in = train_test_split(input_data, train_size=TRAIN_SPLIT, shuffle=True, random_state=123)
    train_data_out, other_data_out = train_test_split(target_data, train_size=TRAIN_SPLIT, shuffle=True, random_state=123)

    # Further splitting the other_data into validation and test sets
    val_data_in, test_data_in = train_test_split(other_data_in, train_size=VAL_SPLIT, shuffle=True, random_state=123)
    val_data_out, test_data_out = train_test_split(other_data_out, train_size=VAL_SPLIT, shuffle=True, random_state=123)

    # Compiling and returning the dataset splits
    dataset_splits = {
        "train": (train_data_in, train_data_out),
        "val": (val_data_in, val_data_out),
        "test": (test_data_in, test_data_out)
    }
    return dataset_splits

# Splitting the reshaped datasets into training, validation, and testing sets
dataset_splits = split_dataset_and_get_stats(input_segments_reshaped, output_segments_reshaped)

In [None]:
# Converting the split data into PyTorch DataLoader objects
def data_to_dataloader(train_inputs, train_targets, val_inputs, val_targets, test_inputs, test_targets, batch_size=64):
    '''
    Converts input and target sequences into PyTorch DataLoader objects.
    This is critical for batch processing during model training and evaluation.
    '''
    # Creating TensorDatasets for training, validation, and testing
    train_dataset = torch.utils.data.TensorDataset(torch.tensor(train_inputs, dtype=torch.float32), torch.tensor(train_targets, dtype=torch.float32))
    val_dataset = torch.utils.data.TensorDataset(torch.tensor(val_inputs, dtype=torch.float32), torch.tensor(val_targets, dtype=torch.float32))
    test_dataset = torch.utils.data.TensorDataset(torch.tensor(test_inputs, dtype=torch.float32), torch.tensor(test_targets, dtype=torch.float32))

    # Creating DataLoader objects for the datasets
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader

# Creating DataLoader objects for the split datasets
train_loader, val_loader, test_loader = data_to_dataloader(*dataset_splits['train'], *dataset_splits['val'], *dataset_splits['test'])

In [None]:
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

class TrajectoryPredictor(nn.Module):
    '''
    This class defines the Trajectory Predictor model using GRU.
    The model consists of encoding and decoding GRU layers and a fully connected layer.
    '''

    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout=0.5):
        '''
        Initialize the model with the specified parameters.
        input_dim, hidden_dim, and output_dim define the dimensions of the network layers.
        num_layers and dropout are used to configure the GRU layers.
        '''
        super(TrajectoryPredictor, self).__init__()
        self.hidden_dim = hidden_dim

        # Encoding layer with dropout for regularization
        self.gru1 = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout)

        # Decoding layer to generate output sequence
        self.gru2 = nn.GRU(hidden_dim, hidden_dim, num_layers, batch_first=True)

        # Fully connected layer for final output predictions
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        '''
        Forward pass of the model.
        Takes input x and produces the output through the GRU layers and fully connected layer.
        '''
        # Encoding
        out, h_n = self.gru1(x)

        # Decoding
        dec_input = torch.zeros(x.size(0), 10, self.hidden_dim).to(x.device)  # Initializing decoder input
        out, _ = self.gru2(dec_input, h_n)

        # Passing through the fully connected layer
        out = self.fc(out)
        return out

# Setting up model hyperparameters
input_dim = 3   # Dimension for 3D coordinates
hidden_dim = 64 # Number of hidden units in GRU
output_dim = 3  # Output dimension for 3D coordinates
num_layers = 2  # Number of GRU layers
dropout = 0.5   # Dropout rate for regularization

# Instantiating the model
model = TrajectoryPredictor(input_dim, hidden_dim, output_dim, num_layers, dropout)
model = model.to(device)


In [None]:
# Defining loss function and optimizer
criterion = nn.MSELoss() # Mean Squared Error Loss
optimizer = optim.Adam(model.parameters(), lr=0.001) # Adam optimizer
scheduler = StepLR(optimizer, step_size=50, gamma=0.1) # Learning rate scheduler

# Defining various performance metrics
def mse(y_true, y_pred):
    '''Calculates Mean Squared Error between true and predicted values.'''
    return ((y_true - y_pred) ** 2).mean()

def rmse(y_true, y_pred):
    '''Calculates Root Mean Squared Error between true and predicted values.'''
    return torch.sqrt(mse(y_true, y_pred))

def mae(y_true, y_pred):
    '''Calculates Mean Absolute Error between true and predicted values.'''
    return torch.abs(y_true - y_pred).mean()

def r2_score(y_true, y_pred):
    '''Calculates R-squared (coefficient of determination) score.'''
    ss_res = mse(y_true, y_pred) * y_true.numel()
    ss_tot = ((y_true - y_true.mean()) ** 2).sum()
    return 1 - ss_res / ss_tot

def adjusted_r2_score(y_true, y_pred, n, p):
    '''Calculates Adjusted R-squared score.'''
    r2 = r2_score(y_true, y_pred)
    return 1 - (1 - r2) * (n - 1) / (n - p - 1)

# Training and validation loops
n_epochs = 1000
patience = 100
early_stopping_counter = 0
best_val_loss = float('inf')

train_losses = []
val_losses = []

for epoch in range(n_epochs):
    # Training loop
    model.train()
    train_loss = 0.0
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)
    train_loss /= len(train_loader.dataset)
    train_losses.append(train_loss)

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item() * inputs.size(0)
    val_loss /= len(val_loader.dataset)

        # Logging the training and validation progress
    print(f'Epoch {epoch+1}/{n_epochs}, Train Loss: {train_loss:.7f}, Validation Loss: {val_loss:.7f}')

    # Learning Rate Scheduler Step
    scheduler.step()

    # Early Stopping Check
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        # Saving the best model state
        torch.save(model.state_dict(), '5000_pos_max_norm_64.pth')
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= patience:
            print('Stopping early due to lack of improvement in validation loss!')
            break

# Visualization of training and validation loss
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Over Epochs')
plt.legend()
plt.savefig('./5000_pos_max_norm_64_loss_plot.png', bbox_inches='tight', dpi=300)
plt.close()




In [None]:
# Evaluating the model's performance
def evaluate_model(model, loader):
    '''
    Evaluate the model's performance on a given DataLoader.
    Calculates MSE, RMSE, MAE, R2, and Adjusted R2 metrics.
    '''
    model.eval()
    total_mse, total_rmse, total_mae, total_r2, total_adj_r2 = 0.0, 0.0, 0.0, 0.0, 0.0
    total_samples = 0

    with torch.no_grad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)

            total_mse += mse(targets, outputs) * inputs.size(0)
            total_rmse += rmse(targets, outputs) * inputs.size(0)
            total_mae += mae(targets, outputs) * inputs.size(0)
            total_r2 += r2_score(targets, outputs) * inputs.size(0)
            total_adj_r2 += adjusted_r2_score(targets, outputs, inputs.size(0), 3) * inputs.size(0)
            total_samples += inputs.size(0)

    return {
        'MSE': total_mse / total_samples,
        'RMSE': total_rmse / total_samples,
        'MAE': total_mae / total_samples,
        'R2': total_r2 / total_samples,
        'Adjusted R2': total_adj_r2 / total_samples
    }

# Loading the best model and evaluating on train, validation, and test sets
model.load_state_dict(torch.load('5000_pos_max_norm_64.pth'))
train_metrics = evaluate_model(model, train_loader)
val_metrics = evaluate_model(model, val_loader)
test_metrics = evaluate_model(model, test_loader)

# Displaying the evaluation metrics
print("Training Metrics:", train_metrics)
print("\nValidation Metrics:", val_metrics)
print("\nTest Metrics:", test_metrics)

In [None]:
# Plotting and saving individual trajectories
def plot_trajectories(inputs, targets, predictions, filename):
    '''
    Plot the input, target, and predicted trajectories in 3D.
    Saves the plot to a specified filename.
    '''
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')

    # Plotting the trajectories
    ax.plot(inputs[:, 0], inputs[:, 1], inputs[:, 2], label='Input Sequence', color='blue')
    ax.plot(targets[:, 0], targets[:, 1], targets[:, 2], label='True Future Trajectory', color='green')
    ax.plot(predictions[:, 0], predictions[:, 1], predictions[:, 2], label='Predicted Trajectory', color='red', linestyle='--')

    # Setting labels and title
    ax.set_xlabel('X Axis')
    ax.set_ylabel('Y Axis')
    ax.set_zlabel('Z Axis')
    ax.set_title('3D Trajectory Prediction')
    ax.legend()

    # Saving the plot
    plt.savefig(filename, bbox_inches='tight')
    plt.close()

# Generating and saving plots for a subset of test data
for i, (inputs, targets) in enumerate(test_loader):
    inputs, targets = inputs.to(device), targets.to(device)
    predictions = model(inputs)

    # Converting tensors to numpy arrays for plotting
    inputs_np = inputs.cpu().detach().numpy()
    targets_np = targets.cpu().detach().numpy()
    predictions_np = predictions.cpu().detach().numpy()

    # Plotting for the first few sequences in the batch
    for j in range(min(5, inputs_np.shape[0])):
        plot_filename = f'trajectory_{i}_{j}.png'
        plot_trajectories(inputs_np[j], targets_np[j], predictions_np[j], plot_filename)