## Imports and Set up


In [14]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

df = pd.read_csv("flocking_250301.csv")


Using device: cpu


## Prepare Data

In [15]:
df["NeighborDiffSum"] = df["NeighborDiffSum"].apply(lambda x: eval(x) if isinstance(x, str) else x)
df["Angle"] = df["Angle"].astype(float)

"""
group data according to each time step
"""
grouped = df.groupby("Step").agg(list)
neighbor_diff_sum = np.array(grouped["NeighborDiffSum"].to_list())  # Shape: (num_steps, num_agents, 2)
angles = np.array(grouped["Angle"].to_list()).reshape(len(grouped), len(grouped.iloc[0]["Angle"]), 1)  # Shape: (num_steps, num_agents, 1)

delta_angle = np.zeros_like(angles)
delta_angle[1:] = angles[1:] - angles[:-1] # Shape: (num_steps, num_agents, 1)

grouped["deltaAngle"] = delta_angle.reshape(len(grouped), -1).tolist()

X_train, X_test, y_train, y_test = train_test_split(neighbor_diff_sum, delta_angle, test_size=0.2, random_state=42)

# to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

## Define Model

In [16]:
# define nn
hidden_size_1 = 16
hidden_size_2 = 8

model = nn.Sequential(
    nn.Linear(2, hidden_size_1),
    nn.ReLU(),
    nn.Linear(hidden_size_1, hidden_size_2),
    nn.ReLU(),
    nn.Linear(hidden_size_2, 1)
)

# define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

## Define Model

In [17]:
num_epochs = 500
for epoch in range(num_epochs):
    optimizer.zero_grad()
    outputs = model(X_train_tensor.view(-1, 2))
    loss = criterion(outputs, y_train_tensor.view(-1, 1))
    loss.backward()
    optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

Epoch [10/500], Loss: 0.7340
Epoch [20/500], Loss: 0.5480
Epoch [30/500], Loss: 0.4447
Epoch [40/500], Loss: 0.3808
Epoch [50/500], Loss: 0.3366
Epoch [60/500], Loss: 0.3056
Epoch [70/500], Loss: 0.2848
Epoch [80/500], Loss: 0.2711
Epoch [90/500], Loss: 0.2634
Epoch [100/500], Loss: 0.2584
Epoch [110/500], Loss: 0.2546
Epoch [120/500], Loss: 0.2520
Epoch [130/500], Loss: 0.2499
Epoch [140/500], Loss: 0.2482
Epoch [150/500], Loss: 0.2468
Epoch [160/500], Loss: 0.2457
Epoch [170/500], Loss: 0.2448
Epoch [180/500], Loss: 0.2440
Epoch [190/500], Loss: 0.2434
Epoch [200/500], Loss: 0.2428
Epoch [210/500], Loss: 0.2424
Epoch [220/500], Loss: 0.2420
Epoch [230/500], Loss: 0.2416
Epoch [240/500], Loss: 0.2413
Epoch [250/500], Loss: 0.2410
Epoch [260/500], Loss: 0.2408
Epoch [270/500], Loss: 0.2407
Epoch [280/500], Loss: 0.2405
Epoch [290/500], Loss: 0.2404
Epoch [300/500], Loss: 0.2403
Epoch [310/500], Loss: 0.2403
Epoch [320/500], Loss: 0.2402
Epoch [330/500], Loss: 0.2401
Epoch [340/500], Lo

## Evaluate the model

In [18]:
with torch.no_grad():
    y_pred = model(X_test_tensor.view(-1, 2))
    test_loss = criterion(y_pred, y_test_tensor.view(-1, 1))
    print(f'Test Loss: {test_loss.item():.4f}')

Test Loss: 0.2610
