# Tube Learning Notebook

## DataFrame Construction

In [None]:
import pandas as pd
import ast

In [None]:
# Step 1: Read the concatenated CSV file into a pandas DataFrame
filename = 'concatenated_trajectory_data.csv'
all_data = pd.read_csv(filename)

In [None]:
# Step 2: Convert necessary fields into appropriate data types
df['joint_positions'] = df['joint_positions'].apply(ast.literal_eval)
df['joint_velocities'] = df['joint_velocities'].apply(ast.literal_eval)

# Step 3: Consolidate columns into new structure
df['x_t'] = df.apply(lambda row: row['joint_positions'] + row['joint_velocities'], axis=1)
df['u_t'] = df.apply(lambda row: [row['velocity_x'], row['velocity_y']], axis=1)
df['z_t'] = df.apply(lambda row: [row['traj_x'], row['traj_y']], axis=1)
df['v_t'] = df.apply(lambda row: [row['reduced_command_x'], row['reduced_command_y']], axis=1)

# Step 4: Calculate the Euclidean distance between position and trajectory for w_t
df['w_t'] = np.sqrt((df['position_x'] - df['traj_x'])**2 + (df['position_y'] - df['traj_y'])**2)

# Step 5: Create the "group" column
df['group'] = df['episode_number'].astype(str) + '_' + df['robot_index'].astype(str)

# Step 6: Create x_{t+1}, z_{t+1}, and w_{t+1} by shifting the DataFrame within each group
df['x_{t+1}'] = df.groupby('group')['x_t'].shift(-1)
df['z_{t+1}'] = df.groupby('group')['z_t'].shift(-1)
df['w_{t+1}'] = df.groupby('group')['w_t'].shift(-1)

# Drop rows where we do not have x_{t+1}, z_{t+1}, and w_{t+1}
df.dropna(subset=['x_{t+1}', 'z_{t+1}', 'w_{t+1}'], inplace=True)

# Step 7: Drop the first and last 10 data points from each episode
def drop_edges(group):
    return group.iloc[10:-10]

df = df.groupby('group', group_keys=False).apply(drop_edges)

# Step 8: Select and order the columns
final_df = df[['group', 'x_t', 'u_t', 'z_t', 'v_t', 'w_t', 'x_{t+1}', 'z_{t+1}', 'w_{t+1}']]

We have $D=\{\omega_t, x_t, u_t, z_t, v_t, \omega_{t+1}, x_{t+1}, z_{t+1}\}$:

In [None]:
# Print the final DataFrame
print(final_df.head())

Optional Saving:

In [None]:
final_df.to_csv('processed_trajectory_data.csv', index=False)

## Network Construction

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from torch.utils.tensorboard import SummaryWriter
import ast

Load in the data:

In [None]:
# Load data (assuming DataFrame is saved in a CSV file named 'data.csv')
final_df = pd.read_csv('processed_trajectory_data.csv')

# Convert to tensors and split data
X = torch.tensor(final_df[['x_t', 'u_t', 'z_t', 'v_t']].values.tolist(), dtype=torch.float32)
y = torch.tensor(final_df[['w_t', 'w_{t+1}']].values, dtype=torch.float32)
dataset = TensorDataset(X, y[:, 1].unsqueeze(1))

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

Model specifications:

In [None]:
class TubeWidthPredictor(nn.Module):
    def __init__(self):
        super(TubeWidthPredictor, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(16, 64),  # Adjust input size based on your data structure
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.network(x)

class AsymmetricLoss(nn.Module):
    def __init__(self, alpha=0.9, delta=1.0):
        super(AsymmetricLoss, self).__init__()
        self.alpha = alpha
        self.huber = nn.HuberLoss(delta=delta)

    def forward(self, y_pred, y_true):
        residual = y_true - y_pred
        loss = torch.where(residual > 0, self.alpha * residual, (1 - self.alpha) * residual.abs())
        return self.huber(loss, torch.zeros_like(loss))
    
def train(num_epochs):
    model.train()
    for epoch in range(num_epochs):
        for batch_idx, (data, targets) in enumerate(train_loader):
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            writer.add_scalar('Loss/train', loss.item(), epoch * len(train_loader) + batch_idx)

def test():
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for data, targets in test_loader:
            outputs = model(data)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
    print(f'Test Loss: {total_loss / len(test_loader)}')

In [None]:
model = TubeWidthPredictor()
criterion = AsymmetricLoss(alpha=0.9)
optimizer = optim.Adam(model.parameters(), lr=0.001)

writer = SummaryWriter('runs/tube_width_experiment')
train(50)
test()
writer.close()