In [None]:
# Python code snippet to extract a .zip file in Google Colab
import zipfile
import os
from tqdm import tqdm

# List of zip files
zip_files = [
    "/RDD2022_China_Drone.zip",
    "/RDD2022_China_MotorBike.zip",
    "/RDD2022_Czech.zip",
    "/RDD2022_India.zip",
]

# Loop through and extract each zip
for zip_path in tqdm(zip_files, desc="Extracting ZIP files"):
    if not os.path.exists(zip_path):
        print(f"❌ File not found: {zip_path}")
        continue

    extract_dir = os.path.splitext(zip_path)[0]  # remove .zip extension
    os.makedirs(extract_dir, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)

    print(f"✅ Extracted: {zip_path} → {extract_dir}")



In [None]:
# --- Clean and install compatible versions ---
!pip install jedi>=0.16
!pip install -U pip setuptools wheel --quiet
!pip uninstall -y numpy jax jaxlib pytensor thinc --quiet


# Reinstall compatible dependencies
!pip install numpy>=2.0
!pip install ultralytics==8.2.90 opencv-python-headless==4.10.0.84 seaborn tqdm pyyaml --quiet


In [None]:
import os
import sys
import shutil
import yaml
import json
import random
from pathlib import Path
from collections import Counter

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

from sklearn.metrics import classification_report, roc_curve, auc
from sklearn.manifold import TSNE

import cv2
from tqdm import tqdm

# YOLOv8
from ultralytics import YOLO

print("✅ Environment ready — YOLOv8 and dependencies loaded successfully!")


In [None]:

# Define dataset paths
dataset_paths = {
    'India': {
        'train_images': '/RDD2022_India/India/train/images',
        'train_annotations': '/RDD2022_India/India/train/annotations',
        'test_images': '/RDD2022_India/India/test/images'
    },
    'Czech': {
        'train_images': '/RDD2022_Czech/train/images',
        'train_annotations': '/RDD2022_Czech/train/annotations',
        'test_images': '/RDD2022_Czech/test/images'
    },
    'China_MotorBike': {
        'train_images': '/RDD2022_China_MotorBike/China_MotorBike/train/images',
        'train_annotations': '/RDD2022_China_MotorBike/China_MotorBike/train/annotations',
        'test_images': '/RDD2022_China_MotorBike/China_MotorBike/test/images'
    },
    'China_Drone': {
        'train_images': '/RDD2022_China_Drone/China_Drone/train/images',
        'train_annotations': '/RDD2022_China_Drone/China_Drone/train/annotations'
    }
}

In [None]:

# Valid class names (based on RDD2022 dataset)
VALID_CLASSES = ['D00', 'D10', 'D20', 'D40', 'D43', 'D44']
class_to_idx = {cls: idx for idx, cls in enumerate(VALID_CLASSES)}

# Output directories
OUTPUT_DIR = '/content/pavement_damage_yolo'
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(f'{OUTPUT_DIR}/train/images', exist_ok=True)
os.makedirs(f'{OUTPUT_DIR}/train/labels', exist_ok=True)
os.makedirs(f'{OUTPUT_DIR}/val/images', exist_ok=True)
os.makedirs(f'{OUTPUT_DIR}/val/labels', exist_ok=True)
os.makedirs(f'{OUTPUT_DIR}/test/images', exist_ok=True)
os.makedirs(f'{OUTPUT_DIR}/test/labels', exist_ok=True)


In [None]:

# Function to parse XML annotations and convert to YOLO format
def parse_xml_to_yolo(xml_path, img_width, img_height):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    boxes = []
    labels = []

    for obj in root.findall('object'):
        cls_name = obj.find('name').text
        if cls_name not in VALID_CLASSES:
            continue  # Skip invalid class names
        cls_idx = class_to_idx[cls_name]

        bbox = obj.find('bndbox')
        xmin = float(bbox.find('xmin').text)
        ymin = float(bbox.find('ymin').text)
        xmax = float(bbox.find('xmax').text)
        ymax = float(bbox.find('ymax').text)

        # Skip invalid bounding boxes
        if xmax <= xmin or ymax <= ymin:
            continue

        # Convert to YOLO format (center_x, center_y, width, height) normalized
        center_x = (xmin + xmax) / 2 / img_width
        center_y = (ymin + ymax) / 2 / img_height
        width = (xmax - xmin) / img_width
        height = (ymax - ymin) / img_height

        boxes.append([center_x, center_y, width, height])
        labels.append(cls_idx)

    return boxes, labels


In [None]:

# Function to load and preprocess dataset
def load_data():
    all_images = []
    all_labels = []

    for dataset_name, paths in dataset_paths.items():
        img_dir = paths['train_images']
        ann_dir = paths['train_annotations']

        if not os.path.exists(img_dir) or not os.path.exists(ann_dir):
            print(f"Skipping {dataset_name}: Directory not found")
            continue

        for img_file in os.listdir(img_dir):
            if not img_file.endswith(('.jpg', '.png')):
                continue
            img_path = os.path.join(img_dir, img_file)
            xml_path = os.path.join(ann_dir, 'xmls', f"{img_file.split('.')[0]}.xml")

            if not os.path.exists(xml_path):
                continue

            # Read image to get dimensions
            img = cv2.imread(img_path)
            if img is None:
                continue
            h, w = img.shape[:2]

            # Parse annotations
            try:
                boxes, labels = parse_xml_to_yolo(xml_path, w, h)
                if len(boxes) > 0 or len(labels) == 0:  # Include images with no valid boxes as background
                    all_images.append(img_path)
                    all_labels.append((boxes, labels, xml_path))
                else:
                    print(f"Skipping {xml_path} due to no valid annotations")
            except Exception as e:
                print(f"Skipping {xml_path} due to error: {str(e)}")

    print(f"Dataset initialized with {len(all_images)} valid image-annotation pairs.")
    return all_images, all_labels


In [None]:

# Function to preprocess and save images/labels in YOLO format
def preprocess_and_save(images, labels, split='train'):
    img_out_dir = f'{OUTPUT_DIR}/{split}/images'
    lbl_out_dir = f'{OUTPUT_DIR}/{split}/labels'

    for img_path, (boxes, lbls, xml_path) in zip(images, labels):
        # Copy and preprocess image
        img = cv2.imread(img_path)
        img = cv2.resize(img, (640, 640))  # Resize to 640x640 for YOLO
        img_name = os.path.basename(img_path)
        cv2.imwrite(os.path.join(img_out_dir, img_name), img)

        # Save YOLO annotations
        lbl_path = os.path.join(lbl_out_dir, f"{img_name.split('.')[0]}.txt")
        with open(lbl_path, 'w') as f:
            for box, lbl in zip(boxes, lbls):
                f.write(f"{lbl} {' '.join(map(str, box))}\n")

# Function to split dataset
def split_dataset(images, labels):
    train_imgs, test_imgs, train_lbls, test_lbls = train_test_split(
        images, labels, test_size=0.2, random_state=42
    )
    train_imgs, val_imgs, train_lbls, val_lbls = train_test_split(
        train_imgs, train_lbls, test_size=0.25, random_state=42  # 0.25 * 0.8 = 0.2
    )

    preprocess_and_save(train_imgs, train_lbls, 'train')
    preprocess_and_save(val_imgs, val_lbls, 'val')
    preprocess_and_save(test_imgs, test_lbls, 'test')

    return len(train_imgs), len(val_imgs), len(test_imgs)

# Function to create YOLO config file
def create_yolo_config():
    config = f"""
path: {OUTPUT_DIR}
train: train/images
val: val/images
test: test/images

nc: {len(VALID_CLASSES)}
names: {VALID_CLASSES}
"""
    with open(f'{OUTPUT_DIR}/data.yaml', 'w') as f:
        f.write(config)

# Function to build and train YOLO model
def train_model():
    model = YOLO('yolov8n.pt')  # Load pretrained YOLOv8n (nano) for speed
    results = model.train(
        data=f'{OUTPUT_DIR}/data.yaml',
        epochs=50,  # Reduced for demo speed
        imgsz=640,
        batch=16,
        device=0 if torch.cuda.is_available() else 'cpu',
        name='pavement_damage'
    )
    return model, results


In [None]:

# Function to evaluate model
def evaluate_model(model, test_images):
    y_true = []
    y_pred = []
    y_scores = []

    for img_path in test_images:
        img = cv2.imread(img_path)
        results = model.predict(img, conf=0.5)

        # Get ground truth
        xml_path = img_path.replace('images', 'annotations').replace('.jpg', '.xml').replace('.png', '.xml')
        boxes, labels = parse_xml_to_yolo(xml_path, img.shape[1], img.shape[0])
        y_true.extend(labels if labels else [len(VALID_CLASSES)])  # Background class

        # Get predictions
        pred_labels = []
        for box in results[0].boxes:
            pred_labels.append(int(box.cls))
            y_scores.append(float(box.conf))
        y_pred.extend(pred_labels if pred_labels else [len(VALID_CLASSES)])

    # Pad lists to equal length
    max_len = max(len(y_true), len(y_pred))
    y_true.extend([len(VALID_CLASSES)] * (max_len - len(y_true)))
    y_pred.extend([len(VALID_CLASSES)] * (max_len - len(y_pred)))
    y_scores.extend([0.0] * (max_len - len(y_scores)))

    # Compute metrics
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")

    # ROC Curve and AUC
    y_true_bin = [1 if x != len(VALID_CLASSES) else 0 for x in y_true]
    y_scores = np.array(y_scores)
    fpr, tpr, _ = roc_curve(y_true_bin, y_scores, pos_label=1)
    auc_score = auc(fpr, tpr)

    plt.figure()
    plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc_score:.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()
    plt.savefig(f'{OUTPUT_DIR}/roc_curve.png')
    plt.show()

    return precision, recall, f1, auc_score


In [None]:

# Function to visualize training curves
def visualize_training_curves(results):
    metrics = results.results_dict
    epochs = range(1, len(metrics['metrics/loss']) + 1)

    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, metrics['metrics/loss'], label='Training Loss')
    plt.plot(epochs, metrics['metrics/val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, metrics['metrics/mAP50'], label='mAP@0.5')
    plt.xlabel('Epoch')
    plt.ylabel('mAP@0.5')
    plt.title('Training mAP@0.5')
    plt.legend()

    plt.savefig(f'{OUTPUT_DIR}/training_curves.png')
    plt.show()


In [None]:

# Function to visualize sample predictions
def visualize_predictions(model, test_images, n_samples=5):
    samples = random.sample(test_images, min(n_samples, len(test_images)))

    plt.figure(figsize=(15, 10))
    for i, img_path in enumerate(samples):
        img = cv2.imread(img_path)
        results = model.predict(img, conf=0.5)

        # Draw ground truth
        xml_path = img_path.replace('images', 'annotations').replace('.jpg', '.xml').replace('.png', '.xml')
        boxes, labels = parse_xml_to_yolo(xml_path, img.shape[1], img.shape[0])
        for box, lbl in zip(boxes, labels):
            x, y, w, h = box
            x1 = int((x - w/2) * img.shape[1])
            y1 = int((y - h/2) * img.shape[0])
            x2 = int((x + w/2) * img.shape[1])
            y2 = int((y + h/2) * img.shape[0])
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(img, VALID_CLASSES[lbl], (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

        # Draw predictions
        for box in results[0].boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            cls = int(box.cls)
            conf = float(box.conf)
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 0, 255), 2)
            cv2.putText(img, f"{VALID_CLASSES[cls]} {conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

        plt.subplot(1, n_samples, i+1)
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.axis('off')

    plt.savefig(f'{OUTPUT_DIR}/sample_predictions.png')
    plt.show()


In [None]:

# Function to compute t-SNE visualization
def visualize_tsne(model, test_images):
    features = []
    labels = []

    for img_path in test_images[:100]:  # Limit for speed
        img = cv2.imread(img_path)
        img = cv2.resize(img, (640, 640))
        img_tensor = torch.from_numpy(img).permute(2, 0, 1).float().unsqueeze(0).cuda() / 255.0
        feat = model.model.forward(img_tensor, augment=False)[1][-1].cpu().detach().numpy().flatten()
        features.append(feat)

        xml_path = img_path.replace('images', 'annotations').replace('.jpg', '.xml').replace('.png', '.xml')
        _, lbls = parse_xml_to_yolo(xml_path, img.shape[1], img.shape[0])
        labels.append(lbls[0] if lbls else len(VALID_CLASSES))

    tsne = TSNE(n_components=2, random_state=42)
    embeddings = tsne.fit_transform(np.array(features))

    plt.figure(figsize=(8, 6))
    scatter = plt.scatter(embeddings[:, 0], embeddings[:, 1], c=labels, cmap='viridis')
    plt.legend(handles=scatter.legend_elements()[0], labels=VALID_CLASSES + ['Background'])
    plt.title('t-SNE Visualization of Feature Embeddings')
    plt.savefig(f'{OUTPUT_DIR}/tsne.png')
    plt.show()


In [None]:
print("Loading data...")
images, labels = load_data()

print("Splitting dataset...")
n_train, n_val, n_test = split_dataset(images, labels)
print(f"Train: {n_train}, Validation: {n_val}, Test: {n_test}")

print("Creating YOLO config...")
create_yolo_config()

print("Training model...")
model, results = train_model()

print("Evaluating model...")
test_images = [os.path.join(f'{OUTPUT_DIR}/test/images', f) for f in os.listdir(f'{OUTPUT_DIR}/test/images')]
precision, recall, f1, auc_score = evaluate_model(model, test_images)

print("Visualizing training curves...")
visualize_training_curves(results)

print("Visualizing sample predictions...")
visualize_predictions(model, test_images)

print("Visualizing t-SNE...")
visualize_tsne(model, test_images)

print("Saving model...")
model.save(f'{OUTPUT_DIR}/pavement_damage_yolo.pt')

print("Saving metrics...")
with open(f'{OUTPUT_DIR}/metrics.txt', 'w') as f:
    f.write(f"Precision: {precision:.4f}\n")
    f.write(f"Recall: {recall:.4f}\n")
    f.write(f"F1-Score: {f1:.4f}\n")
    f.write(f"AUC: {auc_score:.4f}\n")