#### Dependencies

In [1]:
# Importing dependencies
import pandas as pd
import torch
import torchvision
from torchvision import transforms
from PIL import Image
import zipfile
import tarfile
import numpy as np
import os
import matplotlib.pyplot as plt

#### Name constants

In [2]:
# Pascal to coco converter
pascal_to_coco = {
    "aeroplane": "airplane",
    "bicycle": "bicycle",
    "bird": "bird",
    "boat": "boat",
    "bottle": "bottle",
    "bus": "bus",
    "car": "car",
    "cat": "cat",
    "chair": "chair",
    "cow": "cow",
    "diningtable": "dining table",
    "dog": "dog",
    "horse": "horse",
    "motorbike": "motorcycle",
    "person": "person",
    "pottedplant": "potted plant",
    "sheep": "sheep",
    "sofa": "couch",
    "train": "train",
    "tvmonitor": "monitor"
}

# Setting up categories
coco_names = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 
    'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 
    'N/A', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 
    'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 
    'N/A', 'backpack', 'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 
    'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 
    'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 
    'tennis racket', 'bottle', 'N/A', 'wine glass', 'cup', 'fork', 
    'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 
    'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 
    'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A', 'N/A', 
    'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 
    'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 
    'N/A', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 
    'toothbrush'
]

#### Setting up the model

In [3]:
# Setting up device and model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on device: {device}")
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True).to(device)
model.eval()

Running on device: cuda




FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

#### Eye tracking data processing

In [4]:
# Function for processing eye tracking data
def process_eye_tracking_data(CSV_PATH):
    # Prepare variables for analysis
    eye_tracking_data = pd.read_csv(CSV_PATH)
    grouped_data = eye_tracking_data.groupby(["image_file", "class"])["response"]
    image_data = {}

    # Go through the eye tracking data and compute
    for idx, ((img, target_class), res) in enumerate(grouped_data):
        # Compute majority vote
        res_list = list(res)
        majority_vote = round(sum(res_list) / len(res_list))
        agreement_ratio = sum(res_list) / len(res_list)

        # Populate image data
        image_data[img] = {
            "responses": res_list,
            "mv": int(majority_vote),
            "agreement_ratio": agreement_ratio,
            "class": target_class
        }   

    # Return statement
    return image_data

#### predict function

In [5]:
# Function for prediction
def predict(input_tensor, model, device, detection_threshold):
    outputs = model(input_tensor)
    pred_classes = [coco_names[i] for i in outputs[0]['labels'].cpu().numpy()]
    pred_labels = outputs[0]['labels'].cpu().numpy()
    pred_scores = outputs[0]['scores'].detach().cpu().numpy()
    pred_bboxes = outputs[0]['boxes'].detach().cpu().numpy()
    
    boxes, classes, labels, indices = [], [], [], []
    for index in range(len(pred_scores)):
        if pred_scores[index] >= detection_threshold:
            boxes.append(pred_bboxes[index].astype(np.int32))
            classes.append(pred_classes[index])
            labels.append(pred_labels[index])
            indices.append(index)
    boxes = np.int32(boxes)

    return boxes, classes, labels, indices

#### Function for retrieving annotation

In [6]:
def find_annotation(image_name, TAR_PATH, ANNOTATION_PATH="VOCdevkit/VOC2012/Annotations"):
    target_file = f"{ANNOTATION_PATH}/{image_name}.xml"
    # print(target_file) # PRINT STATEMENT 

    # Find the file
    with tarfile.open(TAR_PATH, "r") as tar:
        try:
            member = tar.getmember(target_file)
            if member.isfile():
                with tar.extractfile(member) as file:
                    content = file.read().decode("utf-8")
                    return content
        except KeyError:
            print(f"Error: {target_file} not found in the tar archive.")
            return None
        except Exception as e:
            print(f"Unexpected error: {e}")
            return None

#### Function for extracting bounding boxes

In [7]:
import xml.etree.ElementTree as ET

def extract_bboxes_from_annotation(xml_content: str, class_name: str): # Changed from coco class
    try:
        root = ET.fromstring(xml_content)
    except ET.ParseError as e:
        print(f"Error parsing XML: {e}")
        return []

    bboxes = []

    # Iterate over all object elements in the XML
    for obj in root.findall('object'):
        name = obj.find('name').text.strip().lower()

        # Check if the object's name matches the target_class
        if name == class_name.lower(): # Changed from coco class
            bndbox = obj.find('bndbox')
            if bndbox is not None:
                try:
                    bbox = {
                        'xmin': int(bndbox.find('xmin').text),
                        'ymin': int(bndbox.find('ymin').text),
                        'xmax': int(bndbox.find('xmax').text),
                        'ymax': int(bndbox.find('ymax').text)
                    }
                    bboxes.append(bbox)
                except (AttributeError, ValueError) as e:
                    print(f"Error extracting bounding box coordinates: {e}")
                    continue

    return bboxes

#### Function for calculating the intersection over union

In [8]:
def calculate_iou(boxA, boxB):
    # Determine the (x, y)-coordinates of the intersection rectangle
    xA = max(boxA['xmin'], boxB['xmin'])
    yA = max(boxA['ymin'], boxB['ymin'])
    xB = min(boxA['xmax'], boxB['xmax'])
    yB = min(boxA['ymax'], boxB['ymax'])

    # Compute the area of intersection rectangle
    interWidth = max(0, xB - xA + 1)
    interHeight = max(0, yB - yA + 1)
    interArea = interWidth * interHeight

    # Compute the area of both the prediction and ground-truth rectangles
    boxAArea = (boxA['xmax'] - boxA['xmin'] + 1) * (boxA['ymax'] - boxA['ymin'] + 1)
    boxBArea = (boxB['xmax'] - boxB['xmin'] + 1) * (boxB['ymax'] - boxB['ymin'] + 1)

    # Compute the Intersection over Union
    iou = interArea / float(boxAArea + boxBArea - interArea)

    return iou


#### Main function

In [9]:
def main(ZIP_PATH_POET: str, TAR_PATH_ANNOTATION: str, CSV_PATH: str):
    # Retrieve the eye tracking data
    eye_tracking_data = process_eye_tracking_data(CSV_PATH=CSV_PATH)
    print(f"Length of eye tracking data: {len(eye_tracking_data)}")

    # Variable for the final data
    combined_data = []

    # Go through the zipfile
    with zipfile.ZipFile(ZIP_PATH_POET, "r") as zr:
        print(f"# Number of files in zip: {len(zr.namelist())}")

        # Counter to stop the process
        # counter = 0

        # Go through images processed in the eye tracking data
        for idx, (image_name, data) in enumerate(eye_tracking_data.items()):
            # Deciding if it should stop
            # if counter >= 10:
            #     break
            
            # Status print
            if idx % 200 == 0:
                print(f"Processing image {idx + 1} of {len(eye_tracking_data)}")
            
            # Retrieving class and creating path
            class_name = data["class"].lower()
            coco_class = pascal_to_coco[class_name]
            FULL_PATH = f"POETdataset/PascalImages/{class_name}_{image_name}.jpg"
            # print(f"Pascal class: {class_name}, coco class: {coco_class}") # PRINT STATEMENT 
            # print(f"Full constructed path: {FULL_PATH}") # PRINT STATEMENT

            # Check if the full path is in zip path
            if FULL_PATH not in zr.namelist():
                print(f"Image {FULL_PATH} not found in zip")
                continue

            # Load the image
            with zr.open(FULL_PATH) as file:
                image = Image.open(file)

                # Transform the image for processing
                transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
                input_tensor = transform(image).to(device).unsqueeze(0)
                # print(f"Input Tensor: {input_tensor}")

                # Making the prediction
                boxes, classes, labels, indices = predict(input_tensor, model, device, 0.0)

                # Getting coco class index and sorting the boxes
                class_index = coco_names.index(coco_class)
                current_boxes = [box for box, label in zip(boxes, labels) if label == class_index]
                # print(f"Class index: {class_index}, {current_boxes}") # PRINT STATEMENT

                # print(f"Boxes: {len(boxes)}, Classes: {len(classes)}, Labels: {len(labels)}") # PRINT STATEMENT

                # Finding the annotations folder from the tar file
                annotations = find_annotation(
                    image_name=image_name,
                    TAR_PATH=TAR_PATH_ANNOTATION,
                )

                # Continue if there is no annotations
                if annotations:
                    gtbb = extract_bboxes_from_annotation(annotations, class_name)
                    # print(gtbb) # PRINT STATEMENT
                else:
                    combined_data.append({
                        "image_name": f"{class_name}_{image_name}",
                        "responses": data['responses'],
                        "majority_vote": data['mv'],
                        "agreement_ratio": data['agreement_ratio'],
                        "model_detection": None,
                        "agreement": None,  # Since no ground truth, no agreement
                        "class": data['class'],
                        "zip_file_path": "POETdataset.zip/" + FULL_PATH
                    })
                    continue

                # Convert the predicted boxes to dictionary
                predicted_boxes = []
                for box in current_boxes:
                    predicted_boxes.append({
                        "xmin": int(box[0]),
                        "ymin": int(box[1]),
                        "xmax": int(box[2]),
                        "ymax": int(box[3])
                    })

                # iou flag
                detection_flag = 0
                IOU_THRESHOLD = 0.5

                # Go through the bounding boxes
                for gtb in gtbb:
                    for pb in predicted_boxes:
                        iou = calculate_iou(gtb, pb)
                        if iou >= IOU_THRESHOLD:
                            # print("Success") # PRINT STATEMENT 
                            detection_flag = 1
                            break
                    if detection_flag == 1:
                        break
                
                # Printing the flag for check
                # print(f"Final iou flag: {detection_flag}") # PRINT STATEMENT

                # Calculating agreement
                agreement = detection_flag == data['mv']
                # print(f"Agreement: {agreement}") # PRINT STATEMENT

                # Populating the data structure
                combined_data.append({
                    "image_name": f"{class_name}_{image_name}",
                    "responses": data['responses'],
                    "majority_vote": data['mv'],
                    "agreement_ratio": data['agreement_ratio'],
                    "model_detection": detection_flag,
                    "agreement": agreement,
                    "class": data['class'],
                    "zip_file_path": "POETdataset.zip/" + FULL_PATH
                })

            # Increasing the counter
            # counter += 1
    
    # Return statement
    return eye_tracking_data, combined_data

In [10]:
ZIP_PATH_POET = "POETdataset.zip"
TAR_PATH_ANNOTATION = "VOCtrainval_11-May-2012.tar"
CSV_PATH = "../eye_tracking_data.csv"

# Printin the paths for confirmation
print(f"Zip path for poet dataset: {ZIP_PATH_POET}\nZip path for annotations: {TAR_PATH_ANNOTATION}\nCsv path for eye tracking data: {CSV_PATH}")

# Calling the main function
eye_tracking_data, combined_data = main(
    ZIP_PATH_POET=ZIP_PATH_POET,
    TAR_PATH_ANNOTATION=TAR_PATH_ANNOTATION,
    CSV_PATH=CSV_PATH
)

Zip path for poet dataset: POETdataset.zip
Zip path for annotations: VOCtrainval_11-May-2012.tar
Csv path for eye tracking data: ../eye_tracking_data.csv
Length of eye tracking data: 6131
# Number of files in zip: 6311
Processing image 1 of 6131
Processing image 201 of 6131
Processing image 401 of 6131
Processing image 601 of 6131
Processing image 801 of 6131
Processing image 1001 of 6131
Processing image 1201 of 6131
Processing image 1401 of 6131
Processing image 1601 of 6131
Processing image 1801 of 6131
Processing image 2001 of 6131
Processing image 2201 of 6131
Processing image 2401 of 6131
Processing image 2601 of 6131
Processing image 2801 of 6131
Processing image 3001 of 6131
Processing image 3201 of 6131
Processing image 3401 of 6131
Processing image 3601 of 6131
Processing image 3801 of 6131
Processing image 4001 of 6131
Processing image 4201 of 6131
Processing image 4401 of 6131
Processing image 4601 of 6131
Processing image 4801 of 6131
Processing image 5001 of 6131
Processi

In [11]:
combined_data_df = pd.DataFrame(combined_data)
combined_data_df.head()
combined_data_df.to_csv('combined_data_07_with_gtbb_correct.csv', index=False)
print("Dataframe saved successfully")

Dataframe saved successfully


In [12]:
from sklearn.metrics import f1_score

def calculate_accuracy_and_f1(combined_data):
    # Initialize lists to store true labels and predictions for both the model and eye-tracking system
    model_true_labels = []
    model_predictions = []
    eye_tracking_true_labels = []
    eye_tracking_predictions = []

    for data in combined_data:
        # For the model:
        model_true_labels.append(1)  # True label is always 1 (class is always present)
        model_predictions.append(data["model_detection"])  # Model's prediction (1 or 0)

        # For the eye-tracking system:
        eye_tracking_true_labels.append(1)  # True label is always 1 (class is always present)
        eye_tracking_predictions.append(data["majority_vote"])  # Eye-tracking majority vote (1 or 0)

    # Calculate accuracy for the model and eye-tracking system
    model_accuracy = sum(1 for pred in model_predictions if pred == 1) / len(model_predictions)
    eye_tracking_accuracy = sum(1 for pred in eye_tracking_predictions if pred == 1) / len(eye_tracking_predictions)

    # Calculate F1 score for the model and eye-tracking system
    model_f1 = f1_score(model_true_labels, model_predictions)
    eye_tracking_f1 = f1_score(eye_tracking_true_labels, eye_tracking_predictions)

    # Combine results into a dictionary
    result = {
        "model_accuracy": model_accuracy,
        "eye_tracking_accuracy": eye_tracking_accuracy,
        "model_f1": model_f1,
        "eye_tracking_f1": eye_tracking_f1
    }

    return result

# Example usage:
results = calculate_accuracy_and_f1(combined_data)

# Print combined results
print(f"Model Accuracy: {results['model_accuracy']:.2f}")
print(f"Eye-Tracking Accuracy: {results['eye_tracking_accuracy']:.2f}")
print(f"Model F1 Score: {results['model_f1']:.2f}") 
print(f"Eye-Tracking F1 Score: {results['eye_tracking_f1']:.2f}")

results

Model Accuracy: 0.89
Eye-Tracking Accuracy: 0.88
Model F1 Score: 0.94
Eye-Tracking F1 Score: 0.94


{'model_accuracy': 0.885336812917958,
 'eye_tracking_accuracy': 0.8848474963301256,
 'model_f1': 0.9391815901029501,
 'eye_tracking_f1': 0.9389061959155417}