<a href="https://colab.research.google.com/github/lorenzopaoria/Smoking-detection-and-distance-analysis/blob/main/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Train a model for sigarette, smoker and non smoker detection

In [None]:
import torch
import torchvision
import psutil
import os
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision import transforms
from PIL import Image
import numpy as np
import time
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
class CigaretteDataset(Dataset):
    def __init__(self, coco_annotation_file, image_dir, transform=None):
        self.coco = COCO(coco_annotation_file)
        self.image_dir = image_dir
        self.transform = transform if transform is not None else transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        cat_ids = self.coco.getCatIds(catNms=['cigarette', 'smoker', 'nonSmoker'])
        if not cat_ids:
            raise ValueError("No 'cigarette' category found in COCO file.")
        self.image_ids = list(set(self.coco.getImgIds(catIds=cat_ids)))

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_id = self.image_ids[idx]
        img_info = self.coco.loadImgs(img_id)[0]
        image = Image.open(f"{self.image_dir}/{img_info['file_name']}").convert("RGB")

        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        annotations = self.coco.loadAnns(ann_ids)
        boxes, labels = [], []

        for ann in annotations:
            cat_name = self.coco.loadCats(ann['category_id'])[0]['name']
            x, y, w, h = ann['bbox']

            if cat_name == 'cigarette':
                boxes.append([x, y, x + w, y + h])
                labels.append(1)
            elif cat_name == 'smoker':
                boxes.append([x, y, x + w, y + h])
                labels.append(2)
            elif cat_name == 'nonSmoker':
                boxes.append([x, y, x + w, y + h])
                labels.append(3)

        if len(boxes) == 0:
            return None

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        image = self.transform(image)

        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor([img_id])
        }

        return image, target

In [None]:
def collate_fn(batch):
    return tuple(zip(*[b for b in batch if b is not None]))

In [None]:
def check_system_usage():
    print(f"CPU Usage: {psutil.cpu_percent()}%")
    print(f"RAM Usage: {psutil.virtual_memory().percent}%")
    if torch.cuda.is_available():
        print(f"GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
        print(f"GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e9:.2f} GB")

AP (Average Precision) a vari livelli di IoU,
AR (Average Recall) per varie quantità di detections,
mAP (mean Average Precision).

In [None]:
def evaluate_model(model, dataset, device):
    print("\n=== Starting Validation ===")
    model.eval()
    coco_dt = []
    coco_gt = dataset.coco

    with torch.no_grad():
        for idx in tqdm(range(len(dataset)), desc="Validating", ncols=100):
            image, target = dataset[idx]
            image = image.to(device)
            prediction = model([image])

            for box, score, label in zip(prediction[0]['boxes'], prediction[0]['scores'], prediction[0]['labels']):
                x, y, w, h = box.tolist()
                coco_dt.append({
                    'image_id': target['image_id'].item(),
                    'category_id': label.item(),
                    'bbox': [x, y, w-x, h-y],
                    'score': score.item()
                })

    print(f"\nProcessed {len(coco_dt)} detections")

    if len(coco_dt) == 0:
        print("No detections found during validation!")
        return

    try:
        coco_dt = coco_gt.loadRes(coco_dt)
        coco_eval = COCOeval(coco_gt, coco_dt, iouType='bbox')

        categories = dataset.coco.loadCats(dataset.coco.getCatIds())
        for cat in categories:
            coco_eval.params.catIds = [cat['id']]
            coco_eval.evaluate()
            coco_eval.accumulate()
            print(f"\nResults for {cat['name']}:")
            coco_eval.summarize()

        coco_eval.params.catIds = dataset.coco.getCatIds()
        coco_eval.evaluate()
        coco_eval.accumulate()
        print("\nOverall Results:")
        coco_eval.summarize()
    except Exception as e:
        print(f"Error during evaluation: {str(e)}")

    print("\n=== Validation Complete ===\n")

In [None]:
def train_model(dataset, num_epochs=10, val_dataset=None):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    model = fasterrcnn_resnet50_fpn(weights='DEFAULT')
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes=4)
    model = model.to(device)

    data_loader = DataLoader(
        dataset,
        batch_size=8,
        shuffle=True,
        collate_fn=collate_fn,
        num_workers=2,
        pin_memory=True,
        persistent_workers=True
    )

    optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
    scaler = torch.amp.GradScaler() if torch.cuda.is_available() else None
    epoch_losses = []
    best_mAP = 0
    best_epoch = 0

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        start_time = time.time()
        print(f"Starting Epoch {epoch+1}/{num_epochs}")

        for batch_idx, (images, targets) in enumerate(tqdm(data_loader, desc=f"Epoch {epoch+1}/{num_epochs}", ncols=100, unit="batch")):
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            optimizer.zero_grad()

            if scaler is not None:
                with torch.amp.autocast(device_type='cuda'):
                    loss_dict = model(images, targets)
                    loss = sum(loss for loss in loss_dict.values())

                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            else:
                loss_dict = model(images, targets)
                loss = sum(loss for loss in loss_dict.values())
                loss.backward()
                optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(data_loader)
        epoch_losses.append(avg_loss)

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, Time: {time.time() - start_time:.2f}s")
        check_system_usage()

        if val_dataset:
            print("\nStarting validation...")
            coco_gt = val_dataset.coco
            coco_dt = []
            model.eval()

            with torch.no_grad():
                for idx in tqdm(range(len(val_dataset)), desc="Validating", ncols=100):
                    try:
                        image, target = val_dataset[idx]
                        if image is None or target is None:
                            continue

                        image = image.to(device)
                        prediction = model([image])

                        # Filter predictions with confidence > 0.5
                        boxes = prediction[0]['boxes'][prediction[0]['scores'] > 0.5]
                        labels = prediction[0]['labels'][prediction[0]['scores'] > 0.5]
                        scores = prediction[0]['scores'][prediction[0]['scores'] > 0.5]

                        for box, score, label in zip(boxes, scores, labels):
                            x, y, w, h = box.tolist()
                            coco_dt.append({
                                'image_id': target['image_id'].item(),
                                'category_id': label.item(),
                                'bbox': [x, y, w-x, h-y],
                                'score': score.item()
                            })
                    except Exception as e:
                        print(f"Error processing validation image {idx}: {str(e)}")
                        continue

            if len(coco_dt) > 0:
                try:
                    coco_dt = coco_gt.loadRes(coco_dt)
                    coco_eval = COCOeval(coco_gt, coco_dt, iouType='bbox')
                    coco_eval.evaluate()
                    coco_eval.accumulate()
                    coco_eval.summarize()

                    # Get mAP@0.5 (index 1 in stats array)
                    if coco_eval.stats is not None and len(coco_eval.stats) > 1:
                        mAP = coco_eval.stats[1]
                        print(f"Epoch {epoch+1} mAP@0.5: {mAP:.4f}")

                        if mAP > best_mAP:
                            best_mAP = mAP
                            best_epoch = epoch + 1
                            torch.save(model.state_dict(), 'best_model.pth')
                            print(f"New best model saved with mAP {best_mAP:.4f}")
                except Exception as e:
                    print(f"Error during evaluation: {str(e)}")
                    mAP = 0
            else:
                print("No valid detections in validation set")
                mAP = 0

        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
            'mAP': mAP if val_dataset else None
        }
        torch.save(checkpoint, f'model_epoch_{epoch+1}.pth')
        print(f'Checkpoint saved: model_epoch_{epoch+1}.pth\n')

    if best_epoch > 0:
        print(f"Best model was from epoch {best_epoch} with mAP {best_mAP:.4f}")

    return model

In [None]:
if __name__ == "__main__":
    train_image_dir = '/content/drive/MyDrive/Photo/train'
    train_coco_annotation_file = '/content/drive/MyDrive/Photo/train/_annotations.coco.json'

    valid_image_dir = '/content/drive/MyDrive/Photo/valid'
    valid_coco_annotation_file = '/content/drive/MyDrive/Photo/valid/_annotations.coco.json'

    dataset = CigaretteDataset(train_coco_annotation_file, train_image_dir)
    val_dataset = CigaretteDataset(valid_coco_annotation_file, valid_image_dir)

    model = train_model(dataset, num_epochs=10, val_dataset= val_dataset)
    