In [90]:
import os
import torch
from torch.utils.data import DataLoader, Dataset
import math
import numpy as np

class SpeedDataset(Dataset):
    def __init__(self, directory, sequence_length=5):
        self.data = []
        self.sequence_length = sequence_length
        self.num_classes = (105 - 30) // 10 + 1  # Classes from 30-39, ..., 100-105
        self.preprocess_data(directory)
        self.normalize_features()

    def preprocess_data(self, directory):
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                speed = float(filename.split('_')[-1].replace('.txt', ''))
                speed_class = 0 if speed < 30 else (int(speed) - 30) // 10
                filepath = os.path.join(directory, filename)
                with open(filepath, 'r') as file:
                    track_data = {}
                    for line in file:
                        frame, track_id, x1, y1, x2, y2 = map(float, line.strip().split(',')[:6])
                        if track_id not in track_data:
                            track_data[track_id] = []
                        track_data[track_id].append([x1, y1, x2, y2])

                    for track_id, frames in track_data.items():
                        if len(frames) >= self.sequence_length:
                            features = self.extract_features(frames)
                            overlap = 10  # Set overlap for sequences
                            for start_idx in range(0, len(features) - self.sequence_length + 1, self.sequence_length - overlap):
                                end_idx = start_idx + self.sequence_length
                                sequence = features[start_idx:end_idx]
                                self.data.append((sequence, speed_class))

    def extract_features(self, frames):
        features = []
        for i in range(1, len(frames)):
            current_frame = frames[i]
            previous_frame = frames[i-1]

            x1, y1, x2, y2 = current_frame
            px1, py1, px2, py2 = previous_frame

            width, height = x2 - x1, y2 - y1
            p_width, p_height = px2 - px1, py2 - py1

            # Calculate basic dimensions and changes
            width_change = width - p_width
            height_change = height - p_height
            area_change = (width * height) - (p_width * p_height)
            perimeter_change = (2 * (width + height)) - (2 * (p_width + p_height))

            # Movement calculations
            center_x, center_y = (x1 + x2) / 2, (y1 + y2) / 2
            p_center_x, p_center_y = (px1 + px2) / 2, (py1 + py2) / 2
            center_x_change = center_x - p_center_x
            center_y_change = center_y - p_center_y
            distance_moved = math.sqrt(center_x_change ** 2 + center_y_change ** 2)

            # Velocity and acceleration
            velocity = distance_moved  # Assuming frame rate constant
            acceleration = velocity - math.sqrt((p_center_x - px1 + px2) ** 2 + (p_center_y - py1 + py2) ** 2)

            feature_vector = [
                width, height,  p_width, p_height, width_change, height_change, area_change, perimeter_change,
                center_x, center_y, center_x_change, center_y_change, distance_moved, velocity, acceleration
            ]
            features.append(feature_vector)
        return features

    def normalize_features(self):
        all_features = [feature for sequence, _ in self.data for feature in sequence]
        all_features = np.array(all_features)
        self.mean = np.mean(all_features, axis=0)
        self.std = np.std(all_features, axis=0)

        for i, (sequence, speed_class) in enumerate(self.data):
            normalized_sequence = (sequence - self.mean) / (self.std + 1e-6)  # Prevent division by zero
            self.data[i] = (normalized_sequence, speed_class)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        inputs, output = self.data[idx]
        return torch.tensor(inputs, dtype=torch.float32), torch.tensor(output, dtype=torch.long)


In [91]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class SpeedPredictor(nn.Module):
    def __init__(self, sequence_length, feature_size, hidden_dim, output_size):
        super(SpeedPredictor, self).__init__()
        self.embedding = nn.Conv1d(in_channels=feature_size, out_channels=hidden_dim, kernel_size=1)
        self.pos_encoder = PositionalEncoding(hidden_dim)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=8, dim_feedforward=hidden_dim * 4, dropout=0.1)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=10)
        self.fc1 = nn.Linear(hidden_dim, 256)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, output_size)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.embedding(x)
        x = x.permute(0, 2, 1)
        x = x.permute(1, 0, 2)  
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Switch back to (batch, seq_len, features)
        x = x[:, -1, :]  # Only use the last sequence output for prediction
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight)
        nn.init.constant_(m.bias, 0)


In [92]:
import torch

# Check for CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cuda


In [93]:

def train(model, train_loader, test_loader, criterion, optimizer, scheduler, epochs):
    for epoch in range(epochs):
        model.train()
        Train_total_loss = 0
        correct_train = 0
        total_train = 0

        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)  # Ensure inputs and targets are on the same device as model
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()  # First, update the parameters with the current learning rate
            
            Train_total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            correct_train += (predicted == targets).sum().item()
            total_train += targets.size(0)
        
        scheduler.step()  # After optimizer updates, adjust the learning rate

        if epoch % 20 == 0 or epoch == epochs - 1:
            model.eval()
            correct_test = 0
            total_test = 0
            total_loss = 0
            with torch.no_grad():
                for inputs, targets in test_loader:
                    inputs, targets = inputs.to(device), targets.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, targets)
                    total_loss += loss.item()
                    _, predicted = torch.max(outputs.data, 1)
                    correct_test += (predicted == targets).sum().item()
                    total_test += targets.size(0)
                    
            train_accuracy = 100 * correct_train / total_train
            test_accuracy = 100 * correct_test / total_test
            print(f'Epoch {epoch+1}: Train Loss: {Train_total_loss / len(train_loader)} Test Loss: {total_loss / len(train_loader)}, '
                  f'Train Accuracy: {train_accuracy:.2f}%, Validation Accuracy: {test_accuracy:.2f}%')


In [94]:
import torch
from torch.utils.data import DataLoader, random_split
from torch.optim import Adam, SGD
from torch.optim.lr_scheduler import ExponentialLR

# Assuming your SpeedPredictor and SpeedDataset are already defined and imported

# Initialize datasets
train_dataset = SpeedDataset('outputs', sequence_length=15)
test_dataset = SpeedDataset('test_outputs', sequence_length=15)  
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

len(train_dataset)


7437

In [95]:
def extract_labels(data_loader):
    labels = []
    for _, y in data_loader:
        labels.append(y)
    return torch.cat(labels)  # Concatenate list of tensors into a single tensor

# Extract labels from the training DataLoader
train_labels = extract_labels(train_loader)

# Compute class weights using sklearn's compute_class_weight
from sklearn.utils.class_weight import compute_class_weight

# Convert train_labels to a numpy array and calculate class weights
class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(train_labels.numpy()), 
    y=train_labels.numpy()
)

# Convert class weights to a tensor
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float, device=device)


In [97]:
# Model setup
max_classes = train_dataset.num_classes 
model = SpeedPredictor(sequence_length=15, feature_size=15, hidden_dim=128, output_size=max_classes)
model.to(device)  # Ensure model is on the appropriate device
model.apply(init_weights)
# Loss function and optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor).to(device)  # Ensure loss function is on the appropriate device
optimizer = Adam(model.parameters(), lr=0.0001)
scheduler = ExponentialLR(optimizer, gamma=0.99999999)  # Learning rate scheduler

# Train the model
train(model, train_loader, test_loader, criterion, optimizer, scheduler, epochs=200)


Epoch 1: Train Loss: 2.38696055738335 Test Loss: 0.17576475530608088, Train Accuracy: 12.01%, Validation Accuracy: 19.37%
Epoch 21: Train Loss: 1.1899929291162736 Test Loss: 0.1551542088516757, Train Accuracy: 50.02%, Validation Accuracy: 31.27%
Epoch 41: Train Loss: 1.0330657918229063 Test Loss: 0.16706899139616224, Train Accuracy: 56.60%, Validation Accuracy: 33.33%
Epoch 61: Train Loss: 0.9036241637335883 Test Loss: 0.17192517845039693, Train Accuracy: 61.05%, Validation Accuracy: 36.67%
Epoch 81: Train Loss: 0.8296754706619133 Test Loss: 0.19659593064560849, Train Accuracy: 64.77%, Validation Accuracy: 37.30%
Epoch 101: Train Loss: 0.7457376188702054 Test Loss: 0.23716315856346717, Train Accuracy: 68.23%, Validation Accuracy: 32.70%
Epoch 121: Train Loss: 0.7169507808155484 Test Loss: 0.2249861897056938, Train Accuracy: 67.82%, Validation Accuracy: 35.87%
Epoch 141: Train Loss: 0.6689314714863769 Test Loss: 0.251066512022263, Train Accuracy: 70.47%, Validation Accuracy: 39.84%
Epoc

In [98]:
def print_confusion_matrix_and_report(all_targets, all_preds):
    print(confusion_matrix(all_targets, all_preds))
    print(classification_report(all_targets, all_preds, target_names=[f'Class {30 + i * 10}-{39 + i * 10}' for i in range(max(all_targets) + 1)]))


In [100]:
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
def evaluate(model, test_loader):
    model.eval()
    correct_test = 0
    total_test = 0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)  # Ensure inputs and targets are on the same device as model
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            correct_test += (predicted == targets).sum().item()
            total_test += targets.size(0)
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())

    test_accuracy = 100 * correct_test / total_test
    print("Confusion Matrix and Classification Report:")
    print_confusion_matrix_and_report(all_targets, all_preds)
    return test_accuracy
evaluate(model, test_loader)  

Confusion Matrix and Classification Report:
[[88 13  3  4  0  0  0  0]
 [39 30 29 36  0  0  0  0]
 [ 3  7 19 49  0  0  0  0]
 [ 0  0 11 42 15  7  0  8]
 [ 0  0 10 40  8 18  7  8]
 [ 0  0  8  4 12  6 25 15]
 [ 0  0  1  4  2  3 39 12]
 [ 0  0  0  0  0  0  5  0]]
               precision    recall  f1-score   support

  Class 30-39       0.68      0.81      0.74       108
  Class 40-49       0.60      0.22      0.33       134
  Class 50-59       0.23      0.24      0.24        78
  Class 60-69       0.23      0.51      0.32        83
  Class 70-79       0.22      0.09      0.12        91
  Class 80-89       0.18      0.09      0.12        70
  Class 90-99       0.51      0.64      0.57        61
Class 100-109       0.00      0.00      0.00         5

     accuracy                           0.37       630
    macro avg       0.33      0.33      0.30       630
 weighted avg       0.40      0.37      0.35       630



36.82539682539682