In [None]:
import os
import collections
import pandas as pd
import numpy as np
import functools
import matplotlib.pyplot as plt
import cv2

from sklearn import preprocessing 

import xml.etree.ElementTree as ET

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torch
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, fasterrcnn_resnet50_fpn_v2, fasterrcnn_resnet50_fpn
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights, FasterRCNN_ResNet50_FPN_V2_Weights
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.models.detection.rpn import AnchorGenerator

from torch.utils.data import DataLoader, Dataset
from torch.utils.data import SequentialSampler

from PIL import Image
import seaborn as sns
import copy

import torchmetrics
from torchmetrics.detection import MeanAveragePrecision
from engine import train_one_epoch, evaluate
import utils
import torchvision.transforms.functional as tf
import wandb


In [None]:
import warnings
warnings.filterwarnings("ignore")
from collections import defaultdict, deque
import datetime
import time
from tqdm import tqdm 
from torchvision.utils import draw_bounding_boxes


In [None]:
import shutil
import os
source_dir = 'Construction/train'
img_dir = "custom_dataset/Images"
annot_dir = "custom_dataset/Annotations"

os.makedirs(img_dir, exist_ok=True)
os.makedirs(annot_dir, exist_ok=True)

for filename in os.listdir(source_dir):
    if filename.endswith(".jpg"):
        shutil.move(os.path.join(source_dir, filename), os.path.join(img_dir, filename))
    
    elif filename.endswith(".xml"):
        shutil.move(os.path.join(source_dir, filename), os.path.join(annot_dir, filename))

In [None]:
train_transform=A.Compose([A.HorizontalFlip(),
                           A.ShiftScaleRotate(rotate_limit=15,value=0,
                                              border_mode=cv2.BORDER_CONSTANT),

                           A.OneOf(
                                   [A.CLAHE(),
                                    A.RandomBrightnessContrast(),
                                    A.HueSaturationValue()],p=1),
                           A.GaussNoise(),
                           A.RandomResizedCrop(height=480,width=480)],
                          bbox_params=A.BboxParams(format="pascal_voc",min_visibility=0.15,
                                                   label_fields=["labels"]))
                           
val_transform=A.Compose([A.Resize(height=480,width=480)],
                        bbox_params=A.BboxParams(format="pascal_voc",min_visibility=0.15,
                                                 label_fields=["labels"]))
test_transform=A.Compose([A.Resize(height=480,width=480)],
                        bbox_params=A.BboxParams(format="pascal_voc",min_visibility=0.15,
                                                 label_fields=["labels"]))                                                 

In [None]:

classes=["background",
    'Excavator',
    'Gloves',
    'Hardhat',
    'Ladder',
    'Mask',
    'NO-Hardhat',
    'NO-Mask',
    'NO-Safety Vest',
    'Person',
    'SUV',
    'Safety Cone',
    'Safety Vest',
    'bus',
    'dump truck',
    'fire hydrant',
    'machinery',
    'mini-van',
    'sedan',
    'semi',
    'trailer',
    'truck',
    'truck and trailer',
    'van',
    'vehicle',
    'wheel loader'
]

num_classes=len(classes)
device="cuda" if torch.cuda.is_available() else "cpu"


In [None]:
def parse_xml(annot_path):
    tree=ET.parse(annot_path)
    root=tree.getroot()
    
    width=int(root.find("size").find("width").text)
    height=int(root.find("size").find("height").text)
    boxes=[]
    
    for obj in root.findall("object"):
        bbox=obj.find("bndbox")
        xmin=int(bbox.find("xmin").text)
        ymin=int(bbox.find("ymin").text)
        xmax=int(bbox.find("xmax").text)
        ymax=int(bbox.find("ymax").text)
        
        boxes.append([xmin,ymin,xmax,ymax])
        
    return boxes,height,width

In [None]:
ignore_img=[]
for annot_name in os.listdir(annot_dir):
    img_name=annot_name[:-4]+".jpg"
    annot_path=os.path.join(annot_dir,annot_name)
    boxes,height,width=parse_xml(annot_path)
    
    for box in boxes:
        if box[0]<0 or box[0]>=box[2] or box[2]>width:
            ignore_img.append(img_name)
        elif box[1]<0 or box[1]>=box[3] or box[3]>height:
            ignore_img.append(img_name)

In [None]:
class VOCDataset(Dataset):
    def __init__(self,img_dir,annot_dir,transform=None):
        super().__init__()
        self.img_dir=img_dir
        self.annot_dir=annot_dir
        self.img_list=sorted([img for img in os.listdir(self.img_dir) 
                              if img not in ignore_img])
        self.transform=transform
        
    def __len__(self):
        return len(self.img_list)
    
    def __getitem__(self,idx):
        img_name=self.img_list[idx]
        img_path=os.path.join(self.img_dir,img_name)
        img=cv2.imread(img_path)
        img=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        
        annot_name=img_name[:-4]+".xml"
        annot_path=os.path.join(self.annot_dir,annot_name)
        boxes,height,width=parse_xml(annot_path)
        labels=[1]*len(boxes)
        
        if self.transform is not None:
            transformed=self.transform(image=img,bboxes=boxes,labels=labels)
            img=transformed["image"]
            boxes=transformed["bboxes"]
            labels=transformed["labels"]
        
        if len(np.array(boxes).shape)!=2 or np.array(boxes).shape[-1]!=4:
            boxes=[[0.0,0.0,1.0,1.0]]
            labels=[0]
                
        img=img/255
        img=tf.to_tensor(img)
        img=img.to(dtype=torch.float32)
        target={}
        target["boxes"]=torch.tensor(boxes,dtype=torch.float32)
        target["labels"]=torch.tensor(labels,dtype=torch.int64)
        target["id"]=torch.tensor(idx)
            
        return img,target


In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
train_ds=VOCDataset(img_dir,annot_dir,train_transform)
val_ds=VOCDataset(img_dir,annot_dir,val_transform)
test_ds=VOCDataset(img_dir,annot_dir,test_transform)


In [None]:
from torch.utils.data import Subset

In [None]:
idxs=list(range(len(train_ds)))

np.random.shuffle(idxs)
train_idx=idxs[:int(0.7*len(train_ds))]
val_idx=idxs[int(0.2*len(val_ds)):]
test_idx=idxs[int(0.1*len(test_ds)):]

train_ds=Subset(train_ds,train_idx)
val_ds=Subset(val_ds,val_idx)
test_ds=Subset(test_ds,test_idx)

In [None]:
batch_size=2
train_dl=DataLoader(train_ds,batch_size=batch_size,shuffle=True,num_workers=os.cpu_count(),
                    collate_fn=collate_fn,
                    pin_memory=True if device=="cuda" else False)
val_dl=DataLoader(val_ds,batch_size=batch_size,shuffle=False,num_workers=os.cpu_count(),
                  collate_fn=collate_fn,
                  pin_memory=True if device=="cuda" else False)
test_dl=DataLoader(test_ds,batch_size=batch_size,shuffle=False,num_workers=os.cpu_count(),
                  collate_fn=collate_fn,
                  pin_memory=True if device=="cuda" else False,drop_last=True)           

In [None]:
# Initialize the model with pre-trained weights
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(
    pretrained=True,
    weights=torchvision.models.detection.FasterRCNN_ResNet50_FPN_Weights.DEFAULT,
    weights_backbone=torchvision.models.ResNet50_Weights.IMAGENET1K_V2,
    trainable_backbone_layers=5
)

# Replace the classifier with a new one, for num_classes (25 classes + 1 background)
num_classes = 26  # your dataset classes + background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

model.to(device)

In [None]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
! git clone https://github.com/pytorch/vision.git
! cd vision;cp references/detection/utils.py ../;cp references/detection/transforms.py ../;cp references/detection/coco_eval.py ../;cp references/detection/engine.py ../;cp references/detection/coco_utils.py ../

In [None]:
def get_lr(optimizer):
    for params in optimizer.param_groups:
        return params["lr"]

In [None]:
id_to_label = {index: label for index, label in enumerate(classes)}

def preprocess_bbox(prediction, threshold):
    mask = prediction["scores"] >= threshold
    boxes = prediction["boxes"][mask]
    scores = prediction["scores"][mask]
    labels = prediction["labels"][mask]
    nms_indices = torchvision.ops.nms(boxes, scores, iou_threshold=0.5)
    return {"boxes": boxes[nms_indices], "scores": scores[nms_indices], "labels": labels[nms_indices]}, scores[nms_indices]


In [None]:
wandb.login()

wandb.init(
    project="Faster R-CNN"
    )

## trainer


In [None]:

epochs = 5
f1_scores_per_threshold = defaultdict(lambda: defaultdict(list))
recall_scores_per_threshold = defaultdict(lambda: defaultdict(list))

loss_history = {
    "training_loss": [],
    "validation_loss": [],
    "box_loss": [],       # For bounding box loss
    "cls_loss": [],       # For classification loss
    "dfl_loss": []        # For direction focal loss or any other specific loss
}
              
all_epoch_results = []

train_len=len(train_dl.dataset)
val_len=len(val_dl.dataset)
test_len=len(test_dl.dataset)

best_validation_loss=np.inf
best_weights=copy.deepcopy(model.state_dict())

for epoch in range(epochs):
    
    # Initialize losses for this epoch
    training_loss = 0.0
    validation_loss = 0.0
    box_loss = 0.0
    cls_loss = 0.0
    dfl_loss = 0.0

    
    current_lr=get_lr(optimizer)
    model.train()

    for imgs,targets in train_dl:
        imgs=[img.to(device) for img in imgs]
        targets=[{k:v.to(device) for (k,v) in d.items()} for d in targets]
        
        loss_dict=model(imgs,targets)
        losses=sum(loss for loss in loss_dict.values())
        
        box_loss += loss_dict['loss_box_reg'].item()
        cls_loss += loss_dict['loss_classifier'].item()
        #dfl_loss += loss_dict.get('dfl_loss', 0).item()  
            
        training_loss+=losses.item()
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
       
    with torch.no_grad():
        for imgs,targets in val_dl:
            imgs=[img.to(device) for img in imgs]
            targets=[{k:v.to(device) for (k,v) in d.items()} for d in targets]
            
            loss_dict=model(imgs,targets)
            losses=sum(loss for loss in loss_dict.values())
            validation_loss+=losses.item()
            
    lr_scheduler.step(validation_loss)
    if current_lr!=get_lr(optimizer):
        print("Loading best Model weights")
        model.load_state_dict(best_weights)
    
    if validation_loss<best_validation_loss:
        best_validation_loss=validation_loss
        best_weights=copy.deepcopy(model.state_dict())
        print("Updating Best Model weights")
        
    
    loss_history["box_loss"].append(box_loss / len(train_dl))
    loss_history["cls_loss"].append(cls_loss / len(train_dl))
    #loss_history["dfl_loss"].append(dfl_loss / len(train_dl))
  
    
    print(f"\n{epoch+1}/{epochs}")
    print(f"Training Loss: {training_loss/train_len}")
    print(f"Validation_loss: {validation_loss/val_len}")
    print("\n"+"*"*50)

torch.save(best_weights,"model_rcnn.pth")

In [None]:
all_epoch_results

### Explore the output


In [None]:
from torchvision import transforms as torchtrans  
import matplotlib.patches as patches

def torch_to_pil(img):
    return torchtrans.ToPILImage()(img).convert('RGB')

def plot_img_bbox(img, target):
    # plot the image and bboxes
    # Bounding boxes are defined as follows: x-min y-min width height
    fig, a = plt.subplots(1,1)
    fig.set_size_inches(5,5)
    a.imshow(img)
    for box in (target['boxes']):
        x, y, width, height  = box[0], box[1], box[2]-box[0], box[3]-box[1]
        rect = patches.Rectangle((x, y),
                                 width, height,
                                 linewidth = 2,
                                 edgecolor = 'r',
                                 facecolor = 'none')

        # Draw the bounding box on top of the image
        a.add_patch(rect)
    plt.show()

In [None]:
for i in range(1,3):
    img, target = test_ds[i]
    # put the model in evaluation mode
    model.eval()
    with torch.no_grad():
        prediction = model([img.to(device)])[0]

    plot_img_bbox(torch_to_pil(img), target)

In [None]:
f1_scores_per_class = defaultdict(lambda: defaultdict(list))
recall_scores_per_class = defaultdict(lambda: defaultdict(list))
recall_scores_per_class = defaultdict(lambda: defaultdict(list))

for epoch_results in all_epoch_results:
    for class_idx, class_id in enumerate(epoch_results['classes']):
        precision = epoch_results['map_per_class'][class_idx].item()
        recall = epoch_results['mar_100_per_class'][class_idx].item()
        map_50  = epoch_results['map_50']
        # Calculate F1 Score
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        # Retrieve the class name using a hypothetical id_to_label function or dictionary
        class_name = id_to_label[class_id.item()]

        # Log F1 Score for each class and epoch
        f1_scores_per_class[class_name][epoch].append(f1_score)
        recall_scores_per_class[class_name][epoch].append(recall)
        map_scores_per_class[class_name][epoch].append(precision)

In [None]:
recall_scores_per_threshold

In [None]:
fig, ax = plt.subplots(1, 3, figsize=(20, 5))  # Adjust to create 3 subplots

# Plot F1-Confidence
for class_name, f1_scores in f1_scores_per_class.items():
    confidences = np.linspace(0, 1, len(f1_scores))  # Replace with actual confidences if available
    ax[0].plot(confidences, f1_scores, label=class_name)

ax[0].set_xlabel('Confidence')
ax[0].set_ylabel('F1')
ax[0].set_title('F1-Confidence')
ax[0].legend(loc='upper right', bbox_to_anchor=(1.4, 1.0))  # Adjust legend position
ax[0].grid(True)

# Plot Recall-Confidence
for class_name, recall_scores in recall_scores_per_class.items():
    confidences = np.linspace(0, 1, len(recall_scores))  # Replace with actual confidences if available
    ax[1].plot(confidences, recall_scores, label=class_name)

ax[1].set_xlabel('Confidence')
ax[1].set_ylabel('Recall')
ax[1].set_title('Recall-Confidence')
ax[1].legend(loc='upper right', bbox_to_anchor=(1.4, 1.0))  # Adjust legend position
ax[1].grid(True)

#for class_name, threshold_map_scores in map_scores_per_threshold.items():
#    thresholds = list(threshold_map_scores.keys())
#    map_scores = [sum(scores) / len(scores) for scores in threshold_map_scores.values()]  # Calculate mean mAP score per threshold
#    ax[2].plot(thresholds, map_scores, label=class_name)

epochs = range(1, len(all_epoch_results) + 1)
map_scores = [epoch_result['map'].item() for epoch_result in all_epoch_results]

ax[2].plot(epochs, map_scores, label='mAP')

ax[2].set_xlabel('Epoch')
ax[2].set_ylabel('mAP')
ax[2].set_title('mAP per Epoch')
ax[2].legend(loc='upper right', bbox_to_anchor=(1.5, 1.0))  # Adjust legend position
ax[2].grid(True)

plt.tight_layout()
plt.show()

In [None]:
f1_scores_per_threshold

In [None]:
вапвап

from tqdm import tqdm
from collections import defaultdict
wandb.init(project="RCNN")

id_to_label = {index: label for index, label in enumerate(classes)}

f1_scores_per_threshold = defaultdict(lambda: defaultdict(list))
recall_scores_per_threshold = defaultdict(lambda: defaultdict(list))

confidence_thresholds = [i / 100.0 for i in range(1, 101)]

for threshold in tqdm(confidence_thresholds, desc='Evaluating thresholds'):
    metric = MeanAveragePrecision(box_format='xyxy', class_metrics=True)
    metric.to(device)

    confidence_scores_per_class = {class_name: [] for class_name in id_to_label.values()}

    model.eval()
    with torch.no_grad():
        for imgs, targets in test_dl:

            imgs = [img.to(device) for img in imgs]
            targets = [{k: v.to(device) for (k, v) in d.items()} for d in targets]
            predictions = model(imgs)

            results = []
            for prediction in predictions:
                processed_bbox, _ = preprocess_bbox(prediction, threshold)
                print(processed_bbox, _)
                results.append(processed_bbox)

            metric.update(results, targets)

    results = metric.compute()
    print("results: ", results)
    # Store F1 scores and recall
    for class_idx, class_id in enumerate(results['classes']):
        print("class_id: ", class_id)

        if results['map_per_class'][class_idx] >= 0:  # Check for valid data
            precision = results['map_per_class'][class_idx].item()
            recall = results['mar_100_per_class'][class_idx].item()
            f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            
            class_name = id_to_label[class_id.item()]
            f1_scores_per_threshold[class_name][threshold].append(f1_score)

            recall_scores_per_threshold[class_name][threshold].append(recall)

            wandb.log({
                f"Precision/{class_name}": precision,
                f"Recall/{class_name}": recall,
                f"F1/{class_name}": f1_score,
                f"Average Confidence/{class_name}": sum(confidence_scores_per_class[class_name]) / len(confidence_scores_per_class[class_name]) if confidence_scores_per_class[class_name] else 0,
                "threshold": threshold
            })

# Finish the wandb run
wandb.finish()

In [None]:
for class_name, threshold_f1_scores in f1_scores_per_threshold.items():
    print(class_name, threshold_f1_scores)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(15, 5))

# Plot F1-Confidence
for class_name, threshold_f1_scores in f1_scores_per_threshold.items():
    thresholds = list(threshold_f1_scores.keys())
    f1_scores = [sum(scores) / len(scores) for scores in threshold_f1_scores.values()]  # Calculate mean F1 score per threshold
    ax[0].plot(thresholds, f1_scores, label=class_name)

ax[0].set_xlabel('Confidence')
ax[0].set_ylabel('F1')
ax[0].set_title('F1-Confidence')
ax[0].legend(loc='upper right', bbox_to_anchor=(1.4, 1.0))  # Adjust legend position
ax[0].grid(True)

# Plot Recall-Confidence
for class_name, threshold_recall_scores in recall_scores_per_threshold.items():
    thresholds = list(threshold_recall_scores.keys())
    recall_scores = [sum(scores) / len(scores) for scores in threshold_recall_scores.values()]  # Calculate mean recall score per threshold
    ax[1].plot(thresholds, recall_scores, label=class_name)

ax[1].set_xlabel('Confidence')
ax[1].set_ylabel('Recall')
ax[1].set_title('Recall-Confidence')
ax[1].legend(loc='upper right', bbox_to_anchor=(1.4, 1.0))  # Adjust legend position
ax[1].grid(True)

plt.tight_layout()
plt.show()
