In [None]:
import os
import sys
import pickle
import cv2
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from detectron2.config import get_cfg
from detectron2.engine import DefaultPredictor
sys.path.append("../")
from codes.utils.img_utils import add_bboxes_on_image
from codes.video_handling import videoObj
from codes.utils.evaluation import filter_preds_score_video
from codes.time_consistency import run_on_video
from tqdm import tqdm
import xml.etree.ElementTree as ET
from collections import defaultdict
import torch
import numpy as np

Functions

In [None]:
### load_tire_annotations get the annotations from .xml CVAT file
def load_tire_annotations(xml_path):
    tire_annotations = {}
    tree = ET.parse(xml_path)
    root = tree.getroot()
    for track in root.findall('.//track'):
        label = track.attrib['label']
        if label == "tire":
            for box in track.findall('.//box'):
                frame_num = int(box.attrib['frame'])
                if frame_num not in tire_annotations:
                    tire_annotations[frame_num] = []
                tire_annotations[frame_num].append(box.attrib)
    return tire_annotations

### generate_detection_video_with_gt returns the video with bounding boxes from detection and ground truth from annotations
def generate_detection_video_with_gt(video_path, preds_frames, annotations_path, output_path):
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # codec to MP4

    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    pbar = tqdm(total=frame_count, desc=f'Processing Frames {video_path}')

    tire_annotations = load_tire_annotations(annotations_path)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES) - 1)
        frame_key = 'frame_' + str(frame_num).zfill(5)
        if frame_key in preds_frames:
            instances = preds_frames[frame_key].get('instances')
            if instances:
                boxes = instances.pred_boxes.tensor.cpu().numpy()
                scores = instances.scores.cpu().numpy()
                classes = instances.pred_classes.cpu().numpy()

                thickness = 3  # Define a largura da linha da bounding box

                for box, score, class_id in zip(boxes, scores, classes):
                    box = box.astype(int)
                    cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 0, 255), thickness)
                    cv2.putText(frame, f'{score:.2f}', (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                                (0, 0, 255), thickness)

        if frame_num in tire_annotations:
            for box_info in tire_annotations[frame_num]:
                xtl = int(float(box_info['xtl']))
                ytl = int(float(box_info['ytl']))
                xbr = int(float(box_info['xbr']))
                ybr = int(float(box_info['ybr']))
                cv2.rectangle(frame, (xtl, ytl), (xbr, ybr), (0, 255, 0), thickness)

        frame = cv2.resize(frame, (frame_width, frame_height))
        out.write(frame)
        pbar.update(1)

    pbar.close()
    cap.release()
    out.release()
    cv2.destroyAllWindows()

### generate_detection_video returns the video with bounding boxes from detection without ground truth from annotations
def generate_detection_video(video_path, preds_frames, output_path):
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # codec to MP4

    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    pbar = tqdm(total=frame_count, desc=f'Processing Frames {video_path}')

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_key = 'frame_' + str(int(cap.get(cv2.CAP_PROP_POS_FRAMES) - 1)).zfill(5)
        if frame_key in preds_frames:
            instances = preds_frames[frame_key].get('instances')
            if instances:
                boxes = instances.pred_boxes.tensor.cpu().numpy()
                scores = instances.scores.cpu().numpy()
                classes = instances.pred_classes.cpu().numpy()

                for box, score, class_id in zip(boxes, scores, classes):
                    box = (box).astype(int)
                    cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 0, 255), 2)
                    cv2.putText(frame, f'{score:.2f}', (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                                (0, 0, 255), 2)

        frame = cv2.resize(frame, (frame_width, frame_height))
        out.write(frame)
        pbar.update(1)

    pbar.close()
    cap.release()
    out.release()
    cv2.destroyAllWindows()

### calculates IOU score between 2 bounding boxes: ground truth and predicted ones
def calculate_iou(ground_truth, pred):
    ix1 = np.maximum(ground_truth[0], pred[0])
    iy1 = np.maximum(ground_truth[1], pred[1])
    ix2 = np.minimum(ground_truth[2], pred[2])
    iy2 = np.minimum(ground_truth[3], pred[3])
    i_height = np.maximum(iy2 - iy1 + 1, np.array(0.))
    i_width = np.maximum(ix2 - ix1 + 1, np.array(0.))
    area_of_intersection = i_height * i_width
    gt_height = ground_truth[3] - ground_truth[1] + 1
    gt_width = ground_truth[2] - ground_truth[0] + 1
    pd_height = pred[3] - pred[1] + 1
    pd_width = pred[2] - pred[0] + 1
    area_of_union = gt_height * gt_width + pd_height * pd_width - area_of_intersection
    iou = area_of_intersection / area_of_union
    return iou

### reads annotations in .xml CVAT files
def read_annotations(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    annotations = defaultdict(list)

    for track in root.findall('.//track'):
        label = track.attrib['label']
        if label == "tire":
            for box in track.findall('.//box'):
                frame = int(box.attrib['frame'])
                xtl = float(box.attrib['xtl'])
                ytl = float(box.attrib['ytl'])
                xbr = float(box.attrib['xbr'])
                ybr = float(box.attrib['ybr'])
                annotations[frame].append({'label': label, 'box': [xtl, ytl, xbr, ybr]})

    return annotations


### returns TP, FP, FN, Pr, Rc, F1 for preds_frames (.json) and annotations (.xml CVAT)
def calculate_metrics(preds_frames, annotations):
    TP = FP = FN = 0

    for frame, preds in preds_frames.items():
        preds_boxes = preds['instances'].get('pred_boxes')
        preds_scores = preds['instances'].get('scores')

        if preds_boxes is not None:
            preds_boxes = preds_boxes.tensor.cpu().numpy()
            preds_scores = preds_scores.cpu().numpy()

            annotations_frame = annotations.get(int(frame.split('_')[-1]), [])

            pred_indices = []
            annotation_indices = list(range(len(annotations_frame)))

            for pred_idx, pred_box in enumerate(preds_boxes):
                for annotation_idx, annotation in enumerate(annotations_frame):
                    iou = calculate_iou(pred_box, annotation['box'])
                    #print(iou)
                    #if iou > 0.5 and annotation_idx not in annotation_indices:
                    if iou > 0.5:
                        pred_indices.append(pred_idx)
                        break

            TP += len(pred_indices)
            FP += max(0, len(preds_boxes) - len(pred_indices))
            FN += max(0, len(annotations_frame) - len(pred_indices))

    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return TP, FP, FN, precision, recall, f1_score

### load_predictions loads or computes the pkl file for predictions and returns a dictionary containing the bboxes
def load_predictions(fold, obj, model_iter, video_name, model_name):
    video_path = f'../../data/v1_full/videos/{video_name}.avi'
    vid = videoObj(video_path)
    pred_file = f"../output/tire_test/{model_name}/mbg_{fold}_{obj}/{video_name}_preds_model_thres_{model_iter}.pkl"
    if os.path.isfile(pred_file):
        print(f"loading predictions {video_name}...")
        with open(pred_file, 'rb') as f:
            preds = pickle.load(f) 
        print("done!")
    else:
            print(f"computing predictions {video_name}...")
            
            config_file = f"../codes/configs/mosquitoes/{model_name}.yaml"
                
            cfg = get_cfg()
            cfg.merge_from_file(config_file)
            cfg.MODEL.WEIGHTS = os.path.join(os.path.dirname(pred_file), f"model_{model_iter}.pth")
            cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.9 # set the testing threshold for this model
            cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1
            cfg.MODEL.RPN.PRE_NMS_TOPK_TRAIN = 600
            cfg.MODEL.RPN.PRE_NMS_TOPK_TEST = 300
            cfg.MODEL.RPN.POST_NMS_TOPK_TRAIN = 50
            cfg.MODEL.RPN.POST_NMS_TOPK_TEST = 50
            
            print(f"weights: {cfg.MODEL.WEIGHTS}")        
            
            predictor = DefaultPredictor(cfg)
            preds = run_on_video(video_path, predictor, every=1)
            
            print(f"saving predictions {video_name}...")
            with open(pred_file, 'wb') as f:
                pickle.dump(preds, f)      
            print("done!")
    preds_frames = filter_preds_score_video(preds.copy(), 0.9)
    return preds_frames

Tests

In [None]:
### single video

fold = 'train+val'
obj = 'tire'
model_iter = 'final'
video_name = 'video13'
model_name = 'faster_rcnn_R_50_FPN_1x'
preds_frames = load_predictions(fold, obj, model_iter, video_name, model_name)
annotations = read_annotations(f'../../data/v1_full/annotations/{video_name}.xml')

TP, FP, FN, precision, recall, f1_score = calculate_metrics(preds_frames, annotations)

print("TP:", TP)
print("FP:", FP)
print("FN:", FN)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)
print(f'TP + FP: {TP+FP}')

In [None]:
# Just checking if metrics match with total number of detections

total_detections = 0
for frame_data in preds_frames.values():
    total_detections += len(frame_data['instances'])

print("Total detections:", total_detections)

In [None]:
### all videos

videos = ['video01', 'video02', 'video03', 'video04', 'video05', 'video06', 'video07', 'video08', 'video09', 'video10', 'video11','video12',  'video13']
fold = 'train+val'
obj = 'tire'
model_iter = 'final'
model_name = 'faster_rcnn_R_50_FPN_1x'

for video in videos:
    preds_frames = load_predictions(fold, obj, model_iter, video, model_name)
    annotations = read_annotations(f'../../data/v1_full/annotations/{video}.xml')

    TP, FP, FN, precision, recall, f1_score = calculate_metrics(preds_frames, annotations)

    print("TP:", TP)
    print("FP:", FP)
    print("FN:", FN)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1_score)
    print('-'*20)

In [None]:
import os
import random
import xml.etree.ElementTree as ET
import cv2

# Function to load annotations from XML
def load_annotations(xml_path, objects):
    annotations = {obj: {} for obj in objects}
    tree = ET.parse(xml_path)
    root = tree.getroot()
    
    for track in root.findall('.//track'):
        label = track.attrib['label']
        if label in objects:
            for box in track.findall('.//box'):
                frame_num = int(box.attrib['frame'])
                if frame_num not in annotations[label]:
                    annotations[label][frame_num] = []
                annotations[label][frame_num].append({
                    'xtl': float(box.attrib['xtl']),
                    'ytl': float(box.attrib['ytl']),
                    'xbr': float(box.attrib['xbr']),
                    'ybr': float(box.attrib['ybr'])
                })
    
    return annotations

# Function to generate random images with annotations for a single video
def generate_random_images(video_path, xml_path, objects, output_dir, num_images_per_object=5):
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    if not os.path.exists(os.path.join(output_dir, video_name)):
        os.makedirs(os.path.join(output_dir, video_name))
    
    # Load annotations from XML
    annotations = load_annotations(xml_path, objects)
    
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    for obj in objects:
        if not os.path.exists(os.path.join(output_dir, video_name, obj)):
            os.makedirs(os.path.join(output_dir, video_name, obj))
        
        obj_annotations = annotations[obj]
        
        for _ in range(num_images_per_object):
            frame_num = random.randint(0, frame_count - 1)
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
            ret, frame = cap.read()
            
            if not ret:
                continue
            
            # Check if the frame contains annotations for the current object
            if frame_num in obj_annotations:
                for annotation in obj_annotations[frame_num]:
                    xtl = int(annotation['xtl'])
                    ytl = int(annotation['ytl'])
                    xbr = int(annotation['xbr'])
                    ybr = int(annotation['ybr'])
                    
                    # Crop the frame to the bounding box area
                    cropped_frame = frame[ytl:ybr, xtl:xbr]
                    
                    # Save the cropped frame with bounding box
                    output_file = os.path.join(output_dir, video_name, obj, f'{video_name}_frame_{frame_num}_{xtl}_{ytl}_{xbr}_{ybr}.jpg')
                    cv2.imwrite(output_file, cropped_frame)
    
    cap.release()

# Example usage
if __name__ == "__main__":
    video_path = '/nfs/proc/isabelle.melo/Mosquitoes/dataset/v1/videos/video01.avi'
    xml_path = '/home/isabelle.melo/proc/Mosquitoes/dataset/v1/annotations-xml/video01.xml'
    output_dir = 'generated_images'
    objects = ['bottle']
    
    # Generate random images with annotations for the specified video
    generate_random_images(video_path, xml_path, objects, output_dir)


In [None]:
import os
import random
import xml.etree.ElementTree as ET
import cv2

# Function to load annotations from XML
def load_annotations(xml_path, objects):
    annotations = {obj: {} for obj in objects}
    tree = ET.parse(xml_path)
    root = tree.getroot()
    
    for track in root.findall('.//track'):
        label = track.attrib['label']
        if label in objects:
            for box in track.findall('.//box'):
                frame_num = int(box.attrib['frame'])
                if frame_num not in annotations[label]:
                    annotations[label][frame_num] = []
                annotations[label][frame_num].append({
                    'xtl': float(box.attrib['xtl']),
                    'ytl': float(box.attrib['ytl']),
                    'xbr': float(box.attrib['xbr']),
                    'ybr': float(box.attrib['ybr'])
                })
    
    return annotations

# Function to generate images with annotations for frames where the object appears
def generate_images_with_object(video_path, xml_path, object_name, output_dir):
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Load annotations from XML
    annotations = load_annotations(xml_path, [object_name])
    
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    obj_annotations = annotations[object_name]
    
    for frame_num in obj_annotations:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = cap.read()
        
        if not ret:
            continue
        
        for annotation in obj_annotations[frame_num]:
            xtl = int(annotation['xtl'])
            ytl = int(annotation['ytl'])
            xbr = int(annotation['xbr'])
            ybr = int(annotation['ybr'])
            
            # Crop the frame to the bounding box area
            cropped_frame = frame[ytl:ybr, xtl:xbr]
            
            # Save the cropped frame with bounding box
            output_file = os.path.join(output_dir, f'{video_name}_frame_{frame_num}_{xtl}_{ytl}_{xbr}_{ybr}.jpg')
            cv2.imwrite(output_file, cropped_frame)
    
    cap.release()

# Example usage
if __name__ == "__main__":
    video_path = '/nfs/proc/isabelle.melo/Mosquitoes/dataset/v1/videos/video01.avi'
    xml_path = '/home/isabelle.melo/proc/Mosquitoes/dataset/v1/annotations-xml/video01.xml'
    output_dir = 'generated_images'
    object_name = 'bottle'
    
    # Generate images with annotations for frames where the object appears
    generate_images_with_object(video_path, xml_path, object_name, output_dir)


In [None]:
import os
import random
import xml.etree.ElementTree as ET
import cv2

# Function to load annotations from XML
def load_annotations(xml_path, objects):
    annotations = {obj: {} for obj in objects}
    tree = ET.parse(xml_path)
    root = tree.getroot()
    
    for track in root.findall('.//track'):
        label = track.attrib['label']
        if label in objects:
            for box in track.findall('.//box'):
                frame_num = int(box.attrib['frame'])
                if frame_num not in annotations[label]:
                    annotations[label][frame_num] = []
                annotations[label][frame_num].append({
                    'xtl': float(box.attrib['xtl']),
                    'ytl': float(box.attrib['ytl']),
                    'xbr': float(box.attrib['xbr']),
                    'ybr': float(box.attrib['ybr'])
                })
    
    return annotations

# Function to generate images with annotations for frames where the object appears
def generate_images_with_object(video_path, xml_path, object_name, output_dir):
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    video_dir = os.path.join(output_dir, video_name)
    object_dir = os.path.join(video_dir, object_name)
    
    if not os.path.exists(object_dir):
        os.makedirs(object_dir)
    
    # Load annotations from XML
    annotations = load_annotations(xml_path, [object_name])
    
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    obj_annotations = annotations[object_name]
    
    for frame_num in obj_annotations:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = cap.read()
        
        if not ret:
            continue
        
        for annotation in obj_annotations[frame_num]:
            xtl = int(annotation['xtl'])
            ytl = int(annotation['ytl'])
            xbr = int(annotation['xbr'])
            ybr = int(annotation['ybr'])
            
            # Crop the frame to the bounding box area
            cropped_frame = frame[ytl:ybr, xtl:xbr]
            
            # Save the cropped frame with bounding box
            output_file = os.path.join(object_dir, f'{video_name}_{frame_num}.png')
            cv2.imwrite(output_file, cropped_frame)
    
    cap.release()

# Example usage
if __name__ == "__main__":
    video_path = '/nfs/proc/isabelle.melo/Mosquitoes/dataset/v1/videos/video10.avi'
    xml_path = '/home/isabelle.melo/proc/Mosquitoes/dataset/v1/annotations-xml/video10.xml'
    output_dir = 'generated_images'
    object_name = 'watertank'
    
    # Generate images with annotations for frames where the object appears
    generate_images_with_object(video_path, xml_path, object_name, output_dir)