In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [21]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
import torchvision.transforms as T
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

class SeatbeltDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_dir = os.path.join(root_dir, 'images')
        self.label_dir = os.path.join(root_dir, 'labels')
        self.image_files = [f for f in os.listdir(self.image_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        mask_path = img_path.replace('.jpg', '.npy')
        label_path = os.path.join(self.label_dir, img_name.replace('.jpg', '.txt'))

        # Load image
        image = Image.open(img_path).convert("RGB")

        # Load mask with default if not found
        if os.path.exists(mask_path):
            mask = np.load(mask_path)
            mask = torch.tensor(mask, dtype=torch.uint8)
        else:
            print(f"Warning: Mask file not found: {mask_path}. Using default mask.")
            mask = torch.zeros((1, *Image.open(img_path).size[::-1]), dtype=torch.uint8)  # Default mask with zeros

        # Load annotations
        boxes = []
        labels = []
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f.readlines():
                    parts = line.strip().split()
                    class_id = int(parts[0])
                    x_min = float(parts[1])
                    y_min = float(parts[2])
                    x_max = float(parts[3])
                    y_max = float(parts[4])
                    boxes.append([x_min, y_min, x_max, y_max])
                    labels.append(class_id)
        else:
            print(f"Warning: Label file not found: {label_path}. Skipping.")

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # Validate boxes
        valid_boxes = []
        valid_labels = []
        for box, label in zip(boxes, labels):
            x_min, y_min, x_max, y_max = box
            if x_min < x_max and y_min < y_max:
                valid_boxes.append(box)
                valid_labels.append(label)
            else:
                print(f"Invalid box: {box}")

        if len(valid_boxes) == 0:
            valid_boxes = torch.empty((0, 4), dtype=torch.float32)
            valid_labels = torch.empty((0,), dtype=torch.int64)
        else:
            valid_boxes = torch.stack(valid_boxes, dim=0) if valid_boxes else torch.empty((0, 4), dtype=torch.float32)
            valid_labels = torch.tensor(valid_labels, dtype=torch.int64) if valid_labels else torch.empty((0,), dtype=torch.int64)

        target = {}
        target['boxes'] = valid_boxes
        target['labels'] = valid_labels
        target['masks'] = mask.unsqueeze(0)  # Add an extra dimension for the batch size

        if self.transform:
            image = self.transform(image)

        return image, target

transform = T.Compose([T.ToTensor()])

train_dataset = SeatbeltDataset('/content/drive/MyDrive/SeatBelt_project/Robo_datasets/SeatBelt-1/train', transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

def get_model_instance_segmentation(num_classes):
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights='DEFAULT')

    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # Get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

    return model

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = 2  # Background and seatbelt
model = get_model_instance_segmentation(num_classes)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
num_epochs = 25

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for images, targets in train_dataloader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()
        try:
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            losses.backward()
            optimizer.step()
            epoch_loss += losses.item()
        except Exception as e:
            print(f"Error during training: {e}")
            continue

    lr_scheduler.step()
    print(f"Epoch {epoch+1}, Loss: {epoch_loss}")

    # # Save model checkpoint
    # if (epoch + 1) % 5 == 0:
    #     torch.save(model.state_dict(), f'/content/drive/MyDrive/SeatBelt_model_epoch_{epoch+1}.pth')

# Save the final model
torch.save(model.state_dict(), '/content/drive/MyDrive/SeatBelt_project/Robo_datasets/SeatBelt_model_final.pth')

Invalid box: tensor([0.3229, 0.3209, 0.3016, 0.3005])
Invalid box: tensor([0.2674, 0.3023, 0.2517, 0.3096])
Invalid box: tensor([0.4943, 0.4055, 0.4876, 0.4077])
Invalid box: tensor([0.3856, 0.2719, 0.3276, 0.2991])
Invalid box: tensor([0.1927, 0.2317, 0.2140, 0.2174])
Invalid box: tensor([0.4611, 0.3783, 0.1952, 0.8507])
Invalid box: tensor([0.6715, 0.1467, 0.6026, 0.1469])
Invalid box: tensor([0.5917, 0.2097, 0.3382, 0.6052])
Invalid box: tensor([0.6514, 0.3459, 0.6000, 0.2968])
Invalid box: tensor([0.5534, 0.3163, 0.3959, 0.5439])
Invalid box: tensor([0.3556, 0.2414, 0.3392, 0.2493])
Invalid box: tensor([0.3678, 0.2355, 0.3224, 0.2550])
Invalid box: tensor([0.4693, 0.3336, 0.1970, 0.7619])
Invalid box: tensor([0.6430, 0.2852, 0.3215, 0.7050])
Invalid box: tensor([0.6181, 0.1962, 0.3030, 0.7150])
Invalid box: tensor([0.4901, 0.2755, 0.2376, 0.6227])
Invalid box: tensor([0.6181, 0.2903, 0.2601, 0.9015])
Invalid box: tensor([0.2912, 0.1397, 0.2729, 0.1381])
Invalid box: tensor([0.5927,

KeyboardInterrupt: 

In [None]:
# import numpy as np
# from torchvision.ops import box_iou

# def evaluate_model(model, dataloader, device):
#     model.eval()
#     all_predictions = []
#     all_targets = []

#     with torch.no_grad():
#         for images, targets in dataloader:
#             images = list(image.to(device) for image in images)
#             targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

#             outputs = model(images)
#             all_predictions.extend(outputs)
#             all_targets.extend(targets)

#     return all_predictions, all_targets

# def compute_iou(box1, box2):
#     return box_iou(box1.unsqueeze(0), box2.unsqueeze(0)).item()

# def compute_map(predictions, targets, iou_threshold=0.5):
#     average_precisions = []

#     for prediction, target in zip(predictions, targets):
#         pred_boxes = prediction['boxes'].cpu().numpy()
#         pred_scores = prediction['scores'].cpu().numpy()
#         target_boxes = target['boxes'].cpu().numpy()

#         num_gt_boxes = len(target_boxes)
#         num_pred_boxes = len(pred_boxes)

#         if num_gt_boxes == 0:
#             continue

#         tp = np.zeros(num_pred_boxes)
#         fp = np.zeros(num_pred_boxes)

#         for i, pred_box in enumerate(pred_boxes):
#             iou_max = 0.0
#             for gt_box in target_boxes:
#                 iou = compute_iou(torch.tensor(pred_box), torch.tensor(gt_box))
#                 if iou > iou_max:
#                     iou_max = iou

#             if iou_max > iou_threshold:
#                 tp[i] = 1
#             else:
#                 fp[i] = 1

#         fp_cumsum = np.cumsum(fp)
#         tp_cumsum = np.cumsum(tp)
#         recalls = tp_cumsum / num_gt_boxes
#         precisions = tp_cumsum / (tp_cumsum + fp_cumsum)

#         recall_thresholds = np.linspace(0, 1, 11)
#         precision_interpolated = np.zeros_like(recall_thresholds)

#         for i, threshold in enumerate(recall_thresholds):
#             precisions_above_threshold = precisions[recalls >= threshold]
#             if precisions_above_threshold.size > 0:
#                 precision_interpolated[i] = precisions_above_threshold.max()
#             else:
#                 precision_interpolated[i] = 0.0

#         average_precision = precision_interpolated.mean()
#         average_precisions.append(average_precision)

#     mean_average_precision = np.mean(average_precisions) if average_precisions else 0.0
#     return mean_average_precision

# def visualize_results(images, targets, outputs):
#     for i in range(len(images)):
#         image = images[i].permute(1, 2, 0).cpu().numpy()
#         target = targets[i]
#         output = outputs[i]

#         plt.figure(figsize=(15, 5))

#         plt.subplot(1, 3, 1)
#         plt.imshow(image)
#         plt.title('Image')

#         plt.subplot(1, 3, 2)
#         mask = target['masks'][0].cpu().numpy()
#         plt.imshow(mask, cmap='gray')
#         plt.title('Ground Truth Mask')

#         plt.subplot(1, 3, 3)
#         if output['masks'].shape[0] > 0:
#             pred_mask = output['masks'][0, 0].cpu().numpy()
#         else:
#             pred_mask = np.zeros_like(mask)
#         plt.imshow(pred_mask, cmap='gray')
#         plt.title(f'Predicted Mask')

#         plt.show()

# # Evaluation
# model.eval()
# predictions, targets = evaluate_model(model, val_dataloader, device)
# mAP = compute_map(predictions, targets)
# print(f"Mean Average Precision (mAP): {mAP:.2f}")

# # Visualization
# images, targets = next(iter(val_dataloader))
# images = list(image.to(device) for image in images)
# with torch.no_grad():
#     outputs = model(images)

# visualize_results(images, targets, outputs)

In [None]:
import os
from PIL import Image
from torchvision.transforms import functional as F
from torch.utils.data import Dataset

class TestDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.image_files[idx]

from torchvision.transforms import ToTensor, Resize, Compose

transform = Compose([
    Resize((800, 800)),  # Adjust size as needed
    ToTensor()
])


from torch.utils.data import DataLoader

# Define test dataset and dataloader
test_dataset = TestDataset('/content/drive/MyDrive/SeatBelt_project/DATA/testing', transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=False)

# Load the trained model
model.load_state_dict(torch.load('/content/drive/MyDrive/mask_rcnn_seatbelt.pth'))
model.to(device)
model.eval()

# Run predictions
def predict_on_test_set(model, dataloader, device):
    predictions = []
    image_files = []

    with torch.no_grad():
        for images, files in dataloader:
            images = list(image.to(device) for image in images)
            outputs = model(images)
            predictions.extend(outputs)
            image_files.extend(files)

    return predictions, image_files

predictions, image_files = predict_on_test_set(model, test_dataloader, device)


def visualize_test_results(images, predictions, image_files):
    for i in range(len(images)):
        image = images[i].permute(1, 2, 0).cpu().numpy()
        image_file = image_files[i]

        plt.figure(figsize=(10, 10))
        plt.imshow(image)
        plt.title(f'Image: {image_file}')

        if predictions[i]['masks'].shape[0] > 0:
            pred_mask = predictions[i]['masks'][0, 0].cpu().numpy()
            plt.imshow(pred_mask, cmap='jet', alpha=0.5)  # Overlay mask
        plt.show()

# Visualize results
images, _ = zip(*[test_dataset[idx] for idx in range(len(test_dataset))])
images = [transform(img).unsqueeze(0).to(device) for img in images]
images = [img.squeeze(0) for img in images]
visualize_test_results(images, predictions, image_files)