In [1]:
!pip install torch torchvision opencv-python scikit-learn ray[tune] matplotlib


Collecting ray[tune]
  Downloading ray-2.8.1-cp310-cp310-manylinux2014_x86_64.whl (62.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.6/62.6 MB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
Collecting tensorboardX>=1.9 (from ray[tune])
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorboardX, ray
Successfully installed ray-2.8.1 tensorboardX-2.6.2.2


In [2]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [3]:
import zipfile
import os

# Path to the zip file (you can use the file browser to get this path)
zip_file_path = '/content/drive/MyDrive/Dataset_Potholes/Pothole_Images (2)-20231121T011908Z-001.zip'

# Destination directory where you want to extract the files
destination_folder = '/content/'

# Create the destination folder if it does not exist
if not os.path.exists(destination_folder):
    os.makedirs(destination_folder)

# Unzipping the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

print(f"Files extracted to: {destination_folder}")

Files extracted to: /content/


In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchvision import transforms
import ray
from ray import tune
import matplotlib.pyplot as plt

# Constants
input_size = 224  # Adjust according to your requirements
num_classes = 1  # Assuming binary classification (pothole or not)

# Custom dataset class
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (input_size, input_size))

        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# Load dataset
dataset_folder = "/content/Pothole_Images (2)"
image_paths = [os.path.join(dataset_folder, file) for file in os.listdir(dataset_folder) if file.endswith('.jpg')]
annotation_files = [os.path.join(dataset_folder, file) for file in os.listdir(dataset_folder) if file.endswith('.txt')]

# Load annotations
labels = []
valid_indices = []

for idx, annotation_file in enumerate(annotation_files):
    with open(annotation_file, 'r') as file:
        lines = file.readlines()
        # Assuming one line per annotation file
        if lines:  # Check if there are lines in the file
            parts = lines[0].strip().split()
            label = int(parts[0]) if parts and parts[0].isdigit() else -1  # Assign -1 if no valid label is found
            labels.append(label)

            # Check if the label is valid (not -1)
            if label != -1:
                valid_indices.append(idx)

# Ensure valid_indices are within the range of image_paths and labels
valid_indices = [idx for idx in valid_indices if idx < len(image_paths) and idx < len(labels)]

# Remove images without annotations from the dataset
image_paths = [image_paths[idx] for idx in valid_indices]
labels = [labels[idx] for idx in valid_indices]

# Calculate mean and std of training data
train_dataset = CustomDataset(image_paths, labels=labels, transform=transforms.Compose([transforms.ToTensor()]))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)

train_mean = torch.zeros(3)
train_std = torch.zeros(3)

for inputs, _ in train_loader:
    # Ensure inputs have the correct shape (batch_size, channels, height, width)
    inputs = inputs.squeeze(0) if len(inputs.size()) == 4 else inputs
    train_mean += inputs.mean(dim=(0, 2, 3))
    train_std += inputs.std(dim=(0, 2, 3))

train_mean /= len(train_loader)
train_std /= len(train_loader)

print(f"Mean: {train_mean}, Std: {train_std}")

# Data augmentation and normalization
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=train_mean.tolist(), std=train_std.tolist()),
    # Add more augmentations if needed
])

# Split dataset
X_train, X_temp, y_train, y_temp = train_test_split(image_paths, labels, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Create DataLoader
train_dataset = CustomDataset(X_train, y_train, transform=data_transform)
val_dataset = CustomDataset(X_val, y_val, transform=data_transform)
test_dataset = CustomDataset(X_test, y_test, transform=data_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Model
class MiniYOLOTransfer(nn.Module):
    def __init__(self, num_classes):
        super(MiniYOLOTransfer, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Add more layers as needed
        )
        self.fc = nn.Linear(32 * (input_size // 4) * (input_size // 4), num_classes)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [None]:
# Hyperparameter tuning using Raytune
def train_model(config):
    model = MiniYOLOTransfer(num_classes).to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"], momentum=config["momentum"], weight_decay=config["weight_decay"])
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        # Validation loop after each epoch
        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        tune.report(loss=loss.item(), accuracy=correct / total)



In [None]:
# Ray initialization with GPU
ray.init(num_gpus=1, ignore_reinit_error=True)

# Define hyperparameter search space
config_space = {
    "lr": tune.loguniform(1e-4, 1e-1),
    "momentum": tune.uniform(0.1, 0.9),
    "weight_decay": tune.loguniform(1e-5, 1e-2),
}

# Run hyperparameter tuning
analysis = tune.run(
    train_model,
    config=config_space,
    num_samples=10,
    resources_per_trial={"gpu": 0.2},
)

# Get best hyperparameters
best_config = analysis.get_best_config(metric="accuracy", mode="max")
print("Best Hyperparameters:", best_config)

# Training loop with loss tracking
train_losses = []
val_losses = []

model = MiniYOLOTransfer(num_classes).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=best_config["lr"], momentum=best_config["momentum"], weight_decay=best_config["weight_decay"])
criterion = nn.CrossEntropyLoss()

num_epochs = 10  # You can adjust this if needed
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Validation loop after each epoch
    model.eval()
    val_loss = 0.0
    correct, total = 0, 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            val_loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    train_losses.append(loss.item())
    val_losses.append(val_loss / len(val_loader))

# Plotting
plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Evaluate on test data
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {correct / total}")

# Shutdown Ray
ray.shutdown()
