In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import torch
from torch.utils.data import DataLoader, Dataset
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from PIL import Image

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [None]:
import torchvision.transforms as T
import random

# Define transformations for training
train_transforms = T.Compose([
    T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    T.RandomHorizontalFlip(0.5),  # 50% chance to flip
    T.RandomRotation(degrees=10),  # Rotate randomly between -10 to 10 degrees
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Define transformations for validation (no augmentation, just normalization)
valid_transforms = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


In [None]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
class RoadSignDataset(Dataset):
    def __init__(self, images_dir, labels_dir, transforms=None):
        self.images_dir = images_dir
        self.labels_dir = labels_dir
        self.transforms = transforms
        self.image_files = sorted([f for f in os.listdir(images_dir) if f.endswith('.jpg') or f.endswith('.png')])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.images_dir, self.image_files[idx])
        image = Image.open(img_path).convert("RGB")

        # Use the full image file name (excluding the extension) for the label file name
        label_file_name = os.path.splitext(self.image_files[idx])[0] + '.txt'
        label_path = os.path.join(self.labels_dir, label_file_name)

        boxes = []
        labels = []

        # Check if label file exists
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f:
                    data = line.strip().split()
                    class_id = int(data[0])
                    x_center, y_center, width, height = map(float, data[1:])

                    # Convert YOLO format to (xmin, ymin, xmax, ymax) format
                    xmin = x_center - width / 2
                    ymin = y_center - height / 2
                    xmax = x_center + width / 2
                    ymax = y_center + height / 2

                    boxes.append([xmin, ymin, xmax, ymax])
                    labels.append(class_id)
        else:
            print(f"Warning: No label file found for {img_path}. Using an empty label.")

        # If no labels are found, set default empty tensors for boxes and labels
        if not boxes:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)

        # Convert to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": image_id,
            "area": area,
            "iscrowd": iscrowd
        }

        # Apply transforms
        if self.transforms:
            image = self.transforms(image)

        return image, target



In [None]:
import torchvision.transforms as T

transform = T.Compose([
    T.ToTensor(),
])

In [None]:
train_dataset = RoadSignDataset(
    images_dir='/kaggle/input/road-sign/Road_Sign_Detection  3/train/images/',
    labels_dir='/kaggle/input/road-sign/Road_Sign_Detection  3/train/labels/',
    transforms=train_transforms
)

valid_dataset = RoadSignDataset(
    images_dir='/kaggle/input/road-sign/Road_Sign_Detection  3/valid/images/',
    labels_dir='/kaggle/input/road-sign/Road_Sign_Detection  3/valid/labels/',
    transforms=valid_transforms
)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))



In [None]:
# next(iter(train_loader))

In [None]:
import torchvision

# Set the number of classes based on data.yaml
num_classes = 25  
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
model.to(device)


In [None]:
import torch.optim as optim
from tqdm import tqdm

# Define AdamW optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.AdamW(params, lr=0.0001, weight_decay=0.0005)

# Training loop
num_epochs = 50
save_path = "fasterrcnn_resnet50_checkpoint.pth"  # Define checkpoint path

for epoch in range(num_epochs):
    model.train()
    train_loss = 0  # Track training loss for the epoch

    # Training phase
    for i, (images, targets) in enumerate(tqdm(train_loader, desc=f"Training Epoch {epoch + 1}")):
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        train_loss += losses.item()

        # Backpropagation
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if i % 10 == 0:  # Print progress every 10 steps
            print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{i}/{len(train_loader)}], Loss: {losses.item()}")

    avg_train_loss = train_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}] Average Training Loss: {avg_train_loss:.4f}")

    # Validation phase
    model.train()  # Switch to train mode temporarily to get loss with targets
    val_loss = 0  # Track validation loss for the epoch
    with torch.no_grad():  # Ensure no gradients are calculated
        for images, targets in tqdm(valid_loader, desc=f"Validation Epoch {epoch + 1}"):
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Forward pass for validation with targets to get loss
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            val_loss += losses.item()

    avg_val_loss = val_loss / len(valid_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}] Average Validation Loss: {avg_val_loss:.4f}")

    # Switch back to eval mode after validation
    model.eval()


    # Validation phase
#     model.eval()
#     val_loss = 0  # Track validation loss for the epoch
#     with torch.no_grad():  # No gradient calculation during validation
#         for images, targets in tqdm(valid_loader, desc=f"Validation Epoch {epoch + 1}"):
#             images = [image.to(device) for image in images]
#             targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

#             # Forward pass for validation
#             loss_dict = model(images, targets)
#             losses = sum(loss for loss in loss_dict.values())
#             val_loss += losses.item()

#     avg_val_loss = val_loss / len(valid_loader)
#     print(f"Epoch [{epoch + 1}/{num_epochs}] Average Validation Loss: {avg_val_loss:.4f}")

    # Save model checkpoint every 5 epochs
    if (epoch + 1) % 5 == 0:
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': avg_train_loss,
            'val_loss': avg_val_loss,
        }, save_path)
        print(f"Checkpoint saved at epoch {epoch + 1}")

print("Training complete.")


In [None]:
# torch.save(model.state_dict(), "fasterrcnn_resnet50.pth")

In [None]:
import torch
import torchvision

# Number of classes (from data.yaml)
num_classes = 25  

# Initialize the model and load the trained weights
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

# Load the model checkpoint
checkpoint_path = "/kaggle/working/fasterrcnn_resnet50_checkpoint.pth"  # Adjust path as necessary
checkpoint = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()


In [None]:
from torchvision import transforms
from PIL import Image
import os

test_images_dir = '/kaggle/input/road-sign/Road_Sign_Detection  3/test'  # Adjust path as necessary
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load test images
test_images = [f for f in os.listdir(test_images_dir) if f.endswith('.jpg') or f.endswith('.png')]


In [None]:
predictions = []

for img_name in test_images:
    img_path = os.path.join(test_images_dir, img_name)
    image = Image.open(img_path).convert("RGB")
    image = test_transform(image).to(device)
    
    # Add a batch dimension
    with torch.no_grad():
        outputs = model([image])
    
    # Get the top predicted class for the first detected object, if available
    if len(outputs[0]['labels']) > 0:
        pred_class = outputs[0]['labels'][0].item()  # Get the first prediction label
    else:
        pred_class = None  # No prediction (optional: assign a default value)
    
    predictions.append({"filename": img_name, "class": pred_class})


In [None]:
import pandas as pd

# Create DataFrame for submission
submission_df = pd.DataFrame(predictions)
submission_df.to_csv("submission.csv", index=False)

In [None]:
import pandas as pd

# Load submission.csv
submission_df = pd.read_csv("/kaggle/working/submission.csv")

# Define class names based on provided data.yaml
class_names = [
    '-Road narrows on right', 'Attention Please-', 'Beware of children', 'CYCLE ROUTE AHEAD WARNING', 'Crosswalk',
    'Dangerous Left Curve Ahead', 'Dangerous Right Curve Ahead', 'No Entry', 'No_Over_Taking', 'One way road',
    'Speed bump ahead', 'Speed limit', 'Speed limit 100 Kph', 'Speed limit 120 Kph', 'Speed limit 20 Kph',
    'Speed limit 30 Kph', 'Speed limit 40 Kph', 'Speed limit 50 Kph', 'Speed limit 60 Kph', 'Speed limit 70 Kph',
    'Speed limit 80 Kph', 'Speed limit 90 Kph', 'Stop_Sign', 'Uneven Road', 'roundabout'
]

# Create a mapping from class numbers to names
class_mapping = {i: name for i, name in enumerate(class_names)}

# Replace class numbers with class names
submission_df['class'] = submission_df['class'].apply(lambda x: class_mapping.get(x, 'none'))

# Save the updated submission file
submission_df.to_csv("submission.csv", index=False)

print("Updated submission_with_names.csv created.")
