In [7]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import torchvision.transforms as transforms
from PIL import Image
import cv2

class PedestrianDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform
        self.label_mapping = {"Low": 0, "Medium": 1, "High": 2}

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx]["unlabeled_image_path"]
        label = self.label_mapping[self.dataframe.iloc[idx]["risk_level"]]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label)

def load_datasets(csv_path, batch_size=32):
    df = pd.read_csv(csv_path).sample(frac=1).reset_index(drop=True)
    df = df.drop(["labeled_image_path", "pedestrian_pixels"], axis=1)
    
    train_df = df[:int(0.8 * len(df))]
    val_df = df[int(0.8 * len(df)):int(0.9 * len(df))]
    test_df = df[int(0.9 * len(df)):]

    transform = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    train_dataset = PedestrianDataset(train_df, transform=transform)
    val_dataset = PedestrianDataset(val_df, transform=transform)
    test_dataset = PedestrianDataset(test_df, transform=transform)

    return (
        DataLoader(train_dataset, batch_size=batch_size, shuffle=True),
        DataLoader(val_dataset, batch_size=batch_size, shuffle=False),
        DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    )

In [8]:
train_loader, val_loader, test_loader = load_datasets("pedestrian_risk_analysis.csv")

In [9]:
import torch
import torch.nn as nn

In [10]:
class PedestrianCNN(nn.Module):
    def __init__(self):
        super(PedestrianCNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 16 * 16, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 3)  
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

def get_model(device):
    model = PedestrianCNN().to(device)
    return model

In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [16]:
import torch
import torch.optim as optim
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PedestrianCNN().to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
_loss_plot_test=[]

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss, correct = 0, 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (outputs.argmax(1) == labels).sum().item()
    
    avg_loss = total_loss/ len(train_loader.dataset)
    train_accuracy = correct / len(train_loader.dataset)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}, Accuracy: {train_accuracy:.4f}")
    with torch.no_grad():
        prediction, truth = zip(*[(model(x.to(device)), y.to(device)) for x, y in val_loader])
        prediction = torch.cat(prediction, dim=0)
        truth = torch.cat(truth, dim=0)
        val_loss = loss_fn(prediction, truth)
        _loss_plot_test.append(val_loss.item())
        
    print(f"Epoch [{epoch+1}/{num_epochs}] - Loss: {avg_loss:.4f} Val Loss: {val_loss:.4f}")

Epoch [1/10], Loss: 186.4320, Accuracy: 0.6951
Epoch [1/10] - Loss: 0.0216 Val Loss: 0.5028
Epoch [2/10], Loss: 112.0870, Accuracy: 0.8316
Epoch [2/10] - Loss: 0.0130 Val Loss: 0.4321
Epoch [3/10], Loss: 83.5350, Accuracy: 0.8782
Epoch [3/10] - Loss: 0.0097 Val Loss: 0.2994
Epoch [4/10], Loss: 66.7009, Accuracy: 0.8977
Epoch [4/10] - Loss: 0.0077 Val Loss: 0.3367
Epoch [5/10], Loss: 58.6261, Accuracy: 0.9152
Epoch [5/10] - Loss: 0.0068 Val Loss: 0.2625
Epoch [6/10], Loss: 51.6673, Accuracy: 0.9205
Epoch [6/10] - Loss: 0.0060 Val Loss: 0.2751
Epoch [7/10], Loss: 47.3195, Accuracy: 0.9276
Epoch [7/10] - Loss: 0.0055 Val Loss: 0.2105
Epoch [8/10], Loss: 43.5138, Accuracy: 0.9335
Epoch [8/10] - Loss: 0.0051 Val Loss: 0.2623
Epoch [9/10], Loss: 39.0884, Accuracy: 0.9419
Epoch [9/10] - Loss: 0.0045 Val Loss: 0.2436
Epoch [10/10], Loss: 34.7891, Accuracy: 0.9460
Epoch [10/10] - Loss: 0.0040 Val Loss: 0.2811
