In [None]:
import csv
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
import pandas as pd
import random
import os
from sklearn.model_selection import train_test_split
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler

import matplotlib.pyplot as plt

from datetime import datetime
import socket
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")

## **Proprocess Data to Raw Data for Training**

In [None]:
PIXEL_TO_METER_SCALE = 13.913
image = Image.open("data/map/hkust_4f.jpg")
image = image.resize((int(image.size[0] / PIXEL_TO_METER_SCALE), 
                      int(image.size[1] / PIXEL_TO_METER_SCALE))).transpose(Image.FLIP_TOP_BOTTOM)

def process_csv_file(csv_path, image_size, pixel_to_meter_scale):
    path_data = {'x':[], 'y':[], "Bv":[], "Bh":[], "Bp":[]}
    
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        header = next(reader)  # Skip the header
        
        for row in reader:
            x = float(row[3]) / pixel_to_meter_scale
            y = - float(row[4]) / pixel_to_meter_scale + image_size[1]
            Bv = float(row[0])
            Bh = float(row[1])
            Bp = float(row[2])
            
            path_data["x"].append(x)
            path_data["y"].append(y)
            path_data["Bv"].append(Bv)
            path_data["Bh"].append(Bh)
            path_data["Bp"].append(Bp)
    
    return path_data


train_raw_data = []
data_path = os.path.join(".", "data", "formatted", "HKUST_4F", "training data")

for root, _, files in os.walk(data_path):
    for file in files:
        if file.endswith('.csv'):
            train_raw_data.append(process_csv_file(os.path.join(root, file), image.size, PIXEL_TO_METER_SCALE))

test_raw_data = []
test_data_path = os.path.join(".", "data", "formatted", "HKUST_4F", "testing data")

for root, _, files in os.walk(test_data_path):
    for file in files:
        if file.endswith('.csv'):
            test_raw_data.append(process_csv_file(os.path.join(root, file), image.size, PIXEL_TO_METER_SCALE))

## **Print and Visualize**

In [None]:
def print_data_info(data, name):
    total_length = sum(len(d['x']) for d in data)
    individual_lengths = [len(d['x']) for d in data]
    num_trajectories = len(data)
    
    print(f"{name} data:")
    print(f"  Total length: {total_length}")
    print(f"  Individual lengths: {individual_lengths}")
    print(f"  Number of trajectories: {num_trajectories}")
    print()


print_data_info(train_raw_data, "Train Raw")
print_data_info(test_raw_data, "Test Raw")


## **Process Data to Sequences**

In [None]:
def prepare_sequences(data, sequence_length):
    X, y = [], []
    for traj in data:
        input_seq = np.column_stack((traj['Bv'], traj['Bh'], traj['Bp']))
        output_seq = np.column_stack((traj['x'], traj['y']))
        
        for i in range(len(input_seq) - sequence_length):
            X.append(input_seq[i:i+sequence_length])
            y.append(output_seq[i+sequence_length])
    
    return np.array(X), np.array(y)

# Set sequence length
sequence_length = 10

# Prepare training data
X_train_val, y_train_val = prepare_sequences(train_raw_data, sequence_length)

# Split training data into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42)

# Prepare test data
X_test, y_test = prepare_sequences(test_raw_data, sequence_length)

# Normalize input data
scaler_X = StandardScaler()
X_train_scaled = scaler_X.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_val_scaled = scaler_X.transform(X_val.reshape(-1, X_val.shape[-1])).reshape(X_val.shape)
X_test_scaled = scaler_X.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

# Normalize output data
scaler_y = StandardScaler()
y_train_scaled = scaler_y.fit_transform(y_train)
y_val_scaled = scaler_y.transform(y_val)
y_test_scaled = scaler_y.transform(y_test)

print("Training data shape:", X_train_scaled.shape, y_train_scaled.shape)
print("Validation data shape:", X_val_scaled.shape, y_val_scaled.shape)
print("Testing data shape:", X_test_scaled.shape, y_test_scaled.shape)


## **Define the dataset and dataloader**

In [None]:
class TrajectoryDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create datasets
train_dataset = TrajectoryDataset(X_train_scaled, y_train_scaled)
val_dataset = TrajectoryDataset(X_val_scaled, y_val_scaled)
test_dataset = TrajectoryDataset(X_test_scaled, y_test_scaled)

# Create dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## **Define the RNN model**


In [None]:
class TrajectoryRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(TrajectoryRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.GRU(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        _, hidden = self.rnn(x)
        output = self.fc(hidden.squeeze(0))
        return output

# Initialize the model
input_size = 3  # Bv, Bh, Bp
hidden_size = 64
output_size = 2  # x, y
model = TrajectoryRNN(input_size, hidden_size, output_size).to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## **Train the model**

In [None]:
num_epochs = 50
best_val_loss = float('inf')
patience = 5
no_improve = 0

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    train_loss /= len(train_loader)
    
    # Validate
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
    
    val_loss /= len(val_loader)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    
    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        no_improve = 0
    else:
        no_improve += 1
        if no_improve == patience:
            print("Early stopping!")
            break

# Load best model
model.load_state_dict(torch.load('best_model.pth'))

## **Evaluate the model on Test Set**

In [None]:
def evaluate_model(model, test_loader, scaler_y, device):
    model.eval()
    predictions = []
    true_values = []

    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model(batch_X)
            predictions.append(outputs.cpu().numpy())
            true_values.append(batch_y.cpu().numpy())

    predictions = np.concatenate(predictions)
    true_values = np.concatenate(true_values)

    predictions_original = scaler_y.inverse_transform(predictions)
    true_values_original = scaler_y.inverse_transform(true_values)

    mse = mean_squared_error(true_values_original, predictions_original)
    mae = mean_absolute_error(true_values_original, predictions_original)
    rmse = np.sqrt(mse)
    max_error = np.max(np.abs(predictions_original - true_values_original))
    r2 = r2_score(true_values_original, predictions_original)

    return {
        'predictions': predictions_original,
        'true_values': true_values_original,
        'mse': mse,
        'mae': mae,
        'rmse': rmse,
        'max_error': max_error,
        'r2': r2
    }

# Evaluate the model that was just trained
results = evaluate_model(model, test_loader, scaler_y, device)

# Print results
print("\nMetrics for the trained model:")
print(f"Mean Squared Error: {results['mse']:.4f}")
print(f"Mean Absolute Error: {results['mae']:.4f}")
print(f"Root Mean Squared Error: {results['rmse']:.4f}")
print(f"Maximum Absolute Error: {results['max_error']:.4f}")
print(f"R-squared Score: {results['r2']:.4f}")

## **Interactive Visulization**

In [None]:
def plot_trajectory(trajectory_index, results):
    predictions = results['predictions']
    true_values = results['true_values']
    
    idx = trajectory_index * 100
    true_traj = true_values[idx:idx+100]
    pred_traj = predictions[idx:idx+100]
    
    plt.figure(figsize=(10, 6))
    plt.plot(true_traj[:, 0], true_traj[:, 1], label='True', color='blue')
    plt.plot(pred_traj[:, 0], pred_traj[:, 1], label='Predicted', color='red', linestyle='--')
    
    plt.title(f'Trajectory {trajectory_index}')
    plt.legend()
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.grid(True, linestyle='--', alpha=0.7)
    
    plt.plot(true_traj[0, 0], true_traj[0, 1], 'go', markersize=8, label='Start')
    plt.plot(true_traj[-1, 0], true_traj[-1, 1], 'ro', markersize=8, label='End')
    
    mid = len(true_traj) // 2
    plt.annotate('', xy=(true_traj[mid+1, 0], true_traj[mid+1, 1]),
                 xytext=(true_traj[mid, 0], true_traj[mid, 1]),
                 arrowprops=dict(facecolor='blue', shrink=0.05))
    plt.annotate('', xy=(pred_traj[mid+1, 0], pred_traj[mid+1, 1]),
                 xytext=(pred_traj[mid, 0], pred_traj[mid, 1]),
                 arrowprops=dict(facecolor='red', shrink=0.05))
    
    plt.show()

# Create interactive plot
max_trajectories = len(results['predictions']) // 100 - 1

interact(plot_trajectory,
         trajectory_index=widgets.IntSlider(min=0, max=max_trajectories, step=1, description='Trajectory:'),
         results=fixed(results))


## **Save the Model**

In [None]:
unique_id = f"{socket.gethostname()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
model_path = os.path.join('saved_models', f"model_{unique_id}.pth")
os.makedirs(os.path.dirname(model_path), exist_ok=True)
torch.save(model.state_dict(), model_path)
print(f"Model saved as '{model_path}'")