In [1]:
# install for AWS
!pip install torch --quiet
!pip install pandas --quiet
!pip install scikit-image --quiet
!pip install scikit-learn --quiet
!pip install matplotlib --quiet
!pip install torchvision --quiet
!pip install s3fs --quiet
!pip install boto3 --quiet
!pip install tqdm --quiet
!pip install fiftyone --quiet
!pip install pycocotools --quiet

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
boto3 1.28.68 requires botocore<1.32.0,>=1.31.68, but you have botocore 1.31.64 which is incompatible.[0m[31m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.7.0 requires botocore<1.31.65,>=1.31.16, but you have botocore 1.31.73 which is incompatible.[0m[31m
[0m

In [2]:
import os
import torch
import tarfile
import shutil
import torchvision
import random
import warnings
import boto3
import s3fs
import io
import time
import botocore.exceptions
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import getpass
import json
# import torch.jit as jit

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms, utils, models, datasets
from torch import nn, optim
from torch.optim import lr_scheduler
from io import BytesIO
from tqdm import tqdm
from skimage import io, transform
from PIL import Image
from pycocotools import mask as maskUtils

warnings.filterwarnings("ignore")

import fiftyone as fo
import fiftyone.brain as fob
import fiftyone.zoo as foz
from fiftyone import ViewField as F

In [3]:
########## LM ##########

# access_key = getpass.getpass("Enter your access: ")

# secret_key = password = getpass.getpass("Enter your secret: ")

# bucket_name = 'w210facetdata'
# annotations_prefix = 'annotations/'
# images_prefix = '/home/ubuntu/W210-Capstone'

# s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key)

# # Use s3.open to open the CSV file and read its content into a Pandas DataFrame
# with s3.open(f's3://{bucket_name}/{annotations_prefix}annotations.csv', 'rb') as file:
#     gt_df = pd.read_csv(file)

# ## use relative paths to your image dirs
# # dataset = fo.Dataset(name = "FACET14", persistent=True)
# dataset = fo.load_dataset('FACET14')
# # dataset.add_images_dir(images_prefix)
# dataset.compute_metadata()

In [4]:
########## KH ##########

# Initialize S3 client
s3_client = boto3.client('s3', region_name='us-west-2')

# Define the S3 bucket name and prefixes
bucket_name = 'w210facetdata'
annotations_prefix = 'annotations/'
images_prefix = 'images/'

# Load CSV annotations from S3
annotations_s3_path = f's3://{bucket_name}/{annotations_prefix}'
gt_df = pd.read_csv(f'{annotations_s3_path}annotations.csv')

In [None]:
########## KH ##########
local_images_dir = 'local_images_dir'
os.makedirs(local_images_dir, exist_ok=True)

# Create a paginator to handle pagination of the results
paginator = s3_client.get_paginator('list_objects_v2')

# Use the paginator to retrieve all objects
for page in paginator.paginate(Bucket=bucket_name, Prefix=images_prefix):
    for obj in page.get('Contents', []):
        # Skip the prefix itself
        if obj['Key'] == images_prefix:
            continue
        local_file_path = os.path.join(local_images_dir, os.path.basename(obj['Key']))
        s3_client.download_file(bucket_name, obj['Key'], local_file_path)


In [None]:
########## KH ##########
local_images_dir = 'local_images_dir'

fo.delete_dataset('local_images_dir')
dataset = fo.Dataset(name='local_images_dir')

dataset.add_images_dir(local_images_dir)
dataset.compute_metadata()

In [None]:
# Count the number of files in the local_images_dir
num_files = len([f for f in os.listdir(local_images_dir) if os.path.isfile(os.path.join(local_images_dir, f))])
print(num_files)

# Object Detection Functions

In [None]:
BOOLEAN_PERSONAL_ATTRS = (
    "has_facial_hair",
    "has_tattoo",
    "has_cap",
    "has_mask",
    "has_headscarf",
    "has_eyeware",
)
def add_boolean_person_attributes(detection, row_index):
    for attr in BOOLEAN_PERSONAL_ATTRS:
        detection[attr] = gt_df.loc[row_index, attr].astype(bool)

In [None]:
def get_hairtype(row_index):
    hair_info = gt_df.loc[row_index, gt_df.columns.str.startswith('hairtype')]
    hairtype = hair_info[hair_info == 1]
    if len(hairtype) == 0:
        return None
    return hairtype.index[0].split('_')[1]

def get_haircolor(row_index):
    hair_info = gt_df.loc[row_index, gt_df.columns.str.startswith('hair_color')]
    haircolor = hair_info[hair_info == 1]
    if len(haircolor) == 0:
        return None
    return haircolor.index[0].split('_')[2]

In [None]:
def add_person_attributes(detection, row_index):
    detection["hairtype"] = get_hairtype(row_index)
    detection["haircolor"] = get_haircolor(row_index)
    add_boolean_person_attributes(detection, row_index)

In [None]:
def get_perceived_gender_presentation(row_index):
    gender_info = gt_df.loc[row_index, gt_df.columns.str.startswith('gender')]
    pgp = gender_info[gender_info == 1]
    if len(pgp) == 0:
        return None
    return pgp.index[0].replace("gender_presentation_", "").replace("_", " ")

def get_perceived_age_presentation(row_index):
    age_info = gt_df.loc[row_index, gt_df.columns.str.startswith('age')]
    pap = age_info[age_info == 1]
    if len(pap) == 0:
        return None
    return pap.index[0].split('_')[2]

In [None]:
def get_skintone(row_index):
    skin_info = gt_df.loc[row_index, gt_df.columns.str.startswith('skin_tone')]
    return skin_info.to_dict()

In [None]:
def add_protected_attributes(detection, row_index):
    detection["perceived_age_presentation"] = get_perceived_age_presentation(row_index)
    detection["perceived_gender_presentation"] = get_perceived_gender_presentation(row_index)
    detection["skin_tone"] = get_skintone(row_index)

In [None]:
VISIBILITY_ATTRS = ("visible_torso", "visible_face", "visible_minimal")

In [None]:
def get_lighting(row_index):
    lighting_info = gt_df.loc[row_index, gt_df.columns.str.startswith('lighting')]
    lighting = lighting_info[lighting_info == 1]
    if len(lighting) == 0:
        return None
    lighting = lighting.index[0].replace("lighting_", "").replace("_", " ")
    return lighting

def add_other_attributes(detection, row_index):
    detection["lighting"] = get_lighting(row_index)
    for attr in VISIBILITY_ATTRS:
        detection[attr] = gt_df.loc[row_index, attr].astype(bool)

In [None]:
def create_detection(row_index, sample):
    bbox_dict = json.loads(gt_df.loc[row_index, "bounding_box"])
    x, y, w, h = bbox_dict["x"], bbox_dict["y"], bbox_dict["width"], bbox_dict["height"]
    cat1, cat2 = bbox_dict["dict_attributes"]["cat1"], bbox_dict["dict_attributes"]["cat2"]

    person_id = gt_df.loc[row_index, "person_id"]

    img_width, img_height = sample.metadata.width, sample.metadata.height

    bounding_box = [x/img_width, y/img_height, w/img_width, h/img_height]
    detection = fo.Detection(
        label=cat1, 
        bounding_box=bounding_box,
        person_id=person_id,
        )
    if cat2 != 'none':
        detection["class2"] = cat2

    add_person_attributes(detection, row_index)
    add_protected_attributes(detection, row_index)
    add_other_attributes(detection, row_index)

    return detection

In [None]:
def add_ground_truth_labels(dataset):
    for sample in dataset.iter_samples(autosave=True, progress=True):
        sample_annos = gt_df[gt_df['filename'] == sample.filename]
        detections = []
        for row in sample_annos.iterrows():
            row_index = row[0]
            detection = create_detection(row_index, sample)
            detections.append(detection)
        sample["ground_truth"] = fo.Detections(detections=detections)
    dataset.add_dynamic_sample_fields()



# Add labels

In [None]:
## add all of the ground truth labels
add_ground_truth_labels(dataset)

# Add Masks

In [None]:
def add_coco_masks_to_dataset(dataset):
    ########## LM ##########
    # with s3.open(f's3://{bucket_name}/{annotations_prefix}coco_masks.json', 'rb') as file:
    #     coco_masks = json.load(file)

    ########## KH ##########
    s3 = boto3.client('s3')
    bucket_name = 'w210facetdata'
    object_key = 'annotations/coco_masks.json'
    s3_object = s3.get_object(Bucket=bucket_name, Key=object_key)
    s3_file_content = s3_object['Body'].read().decode('utf-8')
    coco_masks = json.loads(s3_file_content)

    
    cmas = coco_masks["annotations"]

    FILENAME_TO_ID = {
        img["file_name"]: img["id"]
        for img in coco_masks["images"]
    }

    CAT_TO_LABEL = {cat["id"]: cat["name"] for cat in coco_masks["categories"]}

    for sample in dataset.iter_samples(autosave=True, progress=True):
        fn = sample.filename

        if fn not in FILENAME_TO_ID:
            continue

        img_id = FILENAME_TO_ID[fn]
        img_width, img_height = sample.metadata.width, sample.metadata.height
        sample_annos = [a for a in cmas if a["image_id"] == img_id]
        if len(sample_annos) == 0:
            continue

        coco_detections = []
        for ann in sample_annos:
            label = CAT_TO_LABEL[ann["category_id"]]
            bbox = ann['bbox']
            ann_id = ann['ann_id']
            person_id = ann['facet_person_id']

            mask = maskUtils.decode(ann["segmentation"])
            mask = Image.fromarray(255*mask)

            ## Change bbox to be in the format [x, y, x, y]
            bbox[2] = bbox[0] + bbox[2]
            bbox[3] = bbox[1] + bbox[3]

            ## Get the cropped image
            cropped_mask = np.array(mask.crop(bbox)).astype(bool)

            ## Convert to relative [x, y, w, h] coordinates
            bbox[2] = bbox[2] - bbox[0]
            bbox[3] = bbox[3] - bbox[1]

            bbox[0] = bbox[0]/img_width
            bbox[1] = bbox[1]/img_height
            bbox[2] = bbox[2]/img_width
            bbox[3] = bbox[3]/img_height

            new_detection = fo.Detection(
                label=label, 
                bounding_box=bbox,
                person_id=person_id,
                ann_id=ann_id,
                mask=cropped_mask,
                )
            coco_detections.append(new_detection)
        sample["coco_masks"] = fo.Detections(detections=coco_detections)

## add the masks
add_coco_masks_to_dataset(dataset)

# Import Yolo

In [None]:
yolov5 = foz.load_zoo_model('yolov5m-coco-torch')

In [None]:
dataset.apply_model(yolov5, label_field="yolov5m")
### Just retain the "person" detections
people_view_values = dataset.filter_labels("yolov5m", F("label") == "person").values("yolov5m")
dataset.set_values("yolov5m", people_view_values)
dataset.save()

# Clip classification model --> Replace with teacher/student

In [None]:
## get a list of all 52 classes
facet_classes = dataset.distinct("ground_truth.detections.label")

## instantiate a CLIP model with these classes
clip = foz.load_zoo_model(
    "clip-vit-base32-torch",
    text_prompt="A photo of a",
    classes=facet_classes,
)

In [None]:
patch_view = dataset.to_patches("ground_truth")
patch_view.apply_model(clip, label_field="clip")
dataset.save_view("patch_view", patch_view)

In [None]:
IOU_THRESHS = np.round(np.arange(0.5, 1.0, 0.05), 2)

In [None]:
def _evaluate_detection_model(dataset, label_field):
    eval_key = "eval_" + label_field.replace("-", "_")
    dataset.evaluate_detections(label_field, "ground_truth", eval_key=eval_key, classwise=False)
    
    for sample in dataset.iter_samples(progress=True):
        for pred in sample[label_field].detections:
            iou_field = f"{eval_key}_iou"
            if iou_field not in pred:
                continue

            iou = pred[iou_field]
            for it in IOU_THRESHS:
                pred[f"{iou_field}_{str(it).replace('.', '')}"] = iou >= it
        sample.save()

In [None]:
_evaluate_detection_model(dataset, 'yolov5m')

In [None]:
def _compute_detection_mAR(sample_collection, label_field):
    """Computes the mean average recall of the specified detection field.
    -- computed as the average over iou thresholds of the recall at
    each threshold.
    """
    eval_key = "eval_" + label_field.replace("-", "_")
    iou_recalls = []
    for it in IOU_THRESHS:
        field_str = f"{label_field}.detections.{eval_key}_iou_{str(it).replace('.', '')}"
        counts = sample_collection.count_values(field_str)
        tp, fn = counts.get(True, 0), counts.get(False, 0)
        recall = tp/float(tp + fn) if tp + fn > 0 else 0.0
        iou_recalls.append(recall)

    return np.mean(iou_recalls)

In [None]:
def get_concept_attr_detection_mAR(dataset, label_field, concept, attributes):
    sub_view = dataset.filter_labels("ground_truth", F("label") == concept)
    for attribute in attributes.items():
        if "skin_tone" in attribute[0]:
            sub_view = sub_view.filter_labels("ground_truth", F(f"skin_tone.{attribute[0]}") != 0)
        else:
            sub_view = sub_view.filter_labels("ground_truth", F(attribute[0]) == attribute[1])
    return _compute_detection_mAR(sub_view, label_field)

In [None]:
concept = 'lawman'
attributes = {"hairtype": "straight", "haircolor": "brown"}
get_concept_attr_detection_mAR(dataset, "yolov5m", concept, attributes)


In [None]:
def _evaluate_classification_model(dataset, prediction_field):
    patch_view = dataset.load_saved_view("patch_view")
    eval_key = "eval_" + prediction_field
    
    for sample in patch_view.iter_samples(progress=True):
        sample[eval_key] = (
            sample.ground_truth.label == sample[prediction_field].label
        )
        sample.save()
    dataset.save_view("patch_view", patch_view, overwrite=True)

In [None]:
_evaluate_classification_model(dataset, 'clip')

In [None]:
def _compute_classification_recall(patch_collection, label_field):
    eval_key = "eval_" + label_field.split("_")[0]
    counts = patch_collection.count_values(eval_key)
    tp, fn = counts.get(True, 0), counts.get(False, 0)
    recall = tp/float(tp + fn) if tp + fn > 0 else 0.0
    return recall

In [None]:
def get_concept_attr_classification_recall(dataset, label_field, concept, attributes):
    patch_view = dataset.load_saved_view("patch_view")
    sub_patch_view = patch_view.match(F("ground_truth.label") == concept)
    for attribute in attributes.items():
        if "skin_tone" in attribute[0]:
            sub_patch_view = sub_patch_view.match(F(f"ground_truth.skin_tone.{attribute[0]}") != 0)
        else:
            sub_patch_view = sub_patch_view.match(F(f"ground_truth.{attribute[0]}") == attribute[1])
    return _compute_classification_recall(sub_patch_view, label_field)

In [None]:
attribute = {'hairtype': 'curly'}

In [None]:
get_concept_attr_classification_recall(dataset, "clip", concept, attribute)


In [None]:
def get_concept_attr_recall(dataset, label_field, concept, attribute):
    if label_field in dataset.get_field_schema().keys():
        return get_concept_attr_detection_mAR(dataset, label_field, concept, attribute)
    else:
        return get_concept_attr_classification_recall(dataset, label_field, concept, attribute)

In [None]:
def compute_disparity(dataset, label_field, concept, attribute1, attribute2):
    recall1 = get_concept_attr_recall(dataset, label_field, concept, attribute1)
    recall2 = get_concept_attr_recall(dataset, label_field, concept, attribute2)
    return recall1 - recall2

In [None]:
attrs1 = {"perceived_gender_presentation": "fem"}
attrs2 = {"hairtype": "straight"}
for concept in ["astronaut", "singer", "judge", "student"]:
    disparity = compute_disparity(dataset, "clip", concept, attrs1, attrs2)     
    print(f"{concept}: {disparity}")

In [None]:
get_concept_attr_classification_recall(dataset, 'clip', 'singer', attrs1)

# Experimenting with uploading custom models

In [None]:
import fiftyone.utils.torch as fout
from torchvision.models import resnet50
import torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import fiftyone.core.expressions as foe
from fiftyone import ViewField as VF
import json
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
# Load the student model architecture and weights
student_model = torch.load('student_model.pth')
student_model.load_state_dict(torch.load('student_model_weights.pth'))


# Set the number of new classes
num_new_classes = 52

# Modify the student model for the FACET dataset
class AdaptedStudentModel(nn.Module):
    def __init__(self, student_model, num_classes):
        super(AdaptedStudentModel, self).__init__()
        self.features = nn.Sequential(*list(student_model.children())[:-1])  # Preserve the features part of the student model
        self.fc1 = nn.Linear(16 * 15 * 15, num_classes)  # Adjust the linear layer for the new number of classes

    def forward(self, x):
        x = self.features(x)
        x = x.view(-1, 16 * 15 * 15)
        x = self.fc1(x)
        return x

adapted_student_model = AdaptedStudentModel(student_model, num_classes=N)
model = adapted_student_model
model.eval()


In [None]:
def make_data_loader(image_paths, sample_ids, batch_size):
    mean = [0.4914, 0.4822, 0.4465]
    std = [0.2023, 0.1994, 0.2010]
    transforms = torchvision.transforms.Compose(
        [
            torchvision.transforms.Resize((256, 256)),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(mean, std),
        ]
    )
    dataset = fout.TorchImageDataset(
        image_paths, sample_ids=sample_ids, transform=transforms
    )
    return DataLoader(dataset, batch_size=batch_size, num_workers=4)

student = model.to(device)
student

In [None]:
# # Load the saved architecture
# with open('student_architecture.json', 'r') as f:
#     student_architecture = json.load(f)

# # Define the Student model based on the loaded architecture
# class Student(nn.Module):
#     def __init__(self, num_classes):
#         super(Student, self).__init__()
#         self.conv1 = nn.Conv2d(*student_architecture['conv1'])
#         self.pool = nn.MaxPool2d(*student_architecture['pool'])
#         self.fc1 = nn.Linear(*student_architecture['fc1'])
#         self.fc2 = nn.Linear(student_architecture['fc2'][0], num_classes)  # new number of classes

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = x.view(-1, self.fc1.in_features)
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

# # Create a Student model instance with the new number of classes
# num_new_classes = 1000  # replace with the actual number of new classes
# student = Student(num_new_classes)

# # Load the model weights
# state_dict = torch.load('student_weights.pth')

# # Remove the weights of the fc2 layer from the saved state_dict as we are going to initialize it randomly
# state_dict = {k: v for k, v in state_dict.items() if "fc2" not in k}

# # Load the state_dict into the student model
# student.load_state_dict(state_dict, strict=False)

# # Move the model to the appropriate device (e.g., GPU if available)
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# student = student.to(device)

In [None]:
# # Define the Student model
# class Student(nn.Module):
#     def __init__(self):
#         super(Student, self).__init__()
#         self.conv1 = nn.Conv2d(3, 16, 5)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.fc1 = nn.Linear(16*14*14, 120)
#         self.fc2 = nn.Linear(120, 10)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = x.view(-1, 16*14*14)
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

# # Create a new instance of the Student model and load the pretrained weights
# model = Student()
# model.load_state_dict(torch.load('student.pth'))
# model.eval()

# # Set the number of new classes
# num_new_classes = 52

# # Modify the final fully connected (fc) layer
# # Get the number of input features to the fc2 layer
# in_features = model.fc2.in_features

# # Replace the fc2 layer with a new one for the desired number of classes
# model.fc2 = nn.Linear(in_features, num_new_classes)

# def make_data_loader(image_paths, sample_ids, batch_size):
#     mean = [0.4914, 0.4822, 0.4465]
#     std = [0.2023, 0.1994, 0.2010]
#     transforms = torchvision.transforms.Compose(
#         [
#             torchvision.transforms.Resize((256, 256)),
#             torchvision.transforms.ToTensor(),
#             torchvision.transforms.Normalize(mean, std),
#         ]
#     )
#     dataset = fout.TorchImageDataset(
#         image_paths, sample_ids=sample_ids, transform=transforms
#     )
#     return DataLoader(dataset, batch_size=batch_size, num_workers=4)

In [None]:
# model = resnet50(pretrained=True)
# num_new_classes = 52

# # Modify the final fully connected (fc) layer
# # Get the number of input features to the fc layer
# in_features = model.fc.in_features

# # Replace the fc layer with a new one for the desired number of classes
# model.fc = nn.Linear(in_features, num_new_classes)

# def make_data_loader(image_paths, sample_ids, batch_size):
#     mean = [0.4914, 0.4822, 0.4465]
#     std = [0.2023, 0.1994, 0.2010]
#     transforms = torchvision.transforms.Compose(
#         [
#             torchvision.transforms.Resize((256, 256)),
#             torchvision.transforms.ToTensor(),
#             torchvision.transforms.Normalize(mean, std),
#         ]
#     )
#     dataset = fout.TorchImageDataset(
#         image_paths, sample_ids=sample_ids, transform=transforms
#     )
#     return DataLoader(dataset, batch_size=batch_size, num_workers=4)

In [None]:
def predict(model, imgs):
    logits = model(imgs).detach().cpu().numpy()
    predictions = np.argmax(logits, axis=1)
    # predictions = np.argmax(logits, axis=1) - 1
    # print(f'Max prediction: {np.max(predictions)}, Min prediction: {np.min(predictions)}')
    odds = np.exp(logits)
    confidences = np.max(odds, axis=1) / np.sum(odds, axis=1)
    return predictions, confidences

In [None]:
num_samples = 1000
batch_size = 5

view = dataset.take(num_samples, seed=51)
classes = []
for i in view.iter_samples():
    classes.append(i.ground_truth.detections[0].label)

image_paths, sample_ids = zip(*[(s.filepath, s.id) for s in view])
data_loader = make_data_loader(image_paths, sample_ids, batch_size)


In [None]:
# num_samples = 1000
# batch_size = 5

# view = dataset.take(num_samples, seed=51)
# classes = set()  # Use a set to collect unique classes
# for i in view.iter_samples():
#     if i.ground_truth.detections:  # Check if detections is not empty
#         classes.add(i.ground_truth.detections[0].label)
#     else:
#         print(f"Sample {i.id} has no detections")

# image_paths, sample_ids = zip(*[(s.filepath, s.id) for s in view])
# data_loader = make_data_loader(image_paths, sample_ids, batch_size)

In [None]:
# # Convert classes set to a list and create a mapping from class labels to indices
# classes = list(classes)
# class_to_idx = {cls: idx for idx, cls in enumerate(classes)}

# image_paths, sample_ids = zip(*[(s.filepath, s.id) for s in view])
# data_loader = make_data_loader(image_paths, sample_ids, batch_size)

In [None]:
#
# Perform prediction and store results in dataset
#

for imgs, sample_ids in data_loader:
    imgs = imgs.to(device)
    predictions, confidences = predict(model, imgs)

    # Add predictions to your FiftyOne dataset
    for sample_id, prediction, confidence in zip(
        sample_ids, predictions, confidences
    ):
        sample = dataset[sample_id]
        sample["pred"] = fo.Classification(
            label=classes[prediction],  # Use the mapping to get class labels
            confidence=confidence,
        )
        sample.save()

In [None]:
def _evaluate_classification_modelr(dataset, prediction_field):
    eval_key = "eval_" + prediction_field
    
    for sample in dataset.iter_samples(progress=True):
        sample[eval_key] = (
            sample.ground_truth.detections[0].label == sample[prediction_field].label
        )
        sample.save()

In [None]:
# def _evaluate_classification_modelr(dataset, prediction_field):
#     eval_key = "eval_" + prediction_field
    
#     for sample in dataset.iter_samples(progress=True):
#         # Check if ground_truth.detections is not empty
#         if sample.ground_truth.detections:
#             sample[eval_key] = (
#                 sample.ground_truth.detections[0].label == sample[prediction_field].label
#             )
#             sample.save()
#         else:
#             print(f"Sample {sample.id} has no detections")

In [None]:
_evaluate_classification_modelr(view, 'pred')

In [None]:
def _compute_classification_recall(patch_collection, label_field):
    eval_key = "eval_" + label_field
    counts = patch_collection.count_values(eval_key)
    tp, fn = counts.get(True, 0), counts.get(False, 0)
    recall = tp/float(tp + fn) if tp + fn > 0 else 0.0
    return recall

In [None]:
def get_concept_attr_classification_recall(dataset, label_field, concept, attributes):
    sub_patch_view = dataset.filter_labels("ground_truth", VF("label") == concept)  # Use foe instead of F
    for attribute in attributes.items():
        if "skin_tone" in attribute[0]:
            sub_patch_view = sub_patch_view.filter_labels('ground_truth', VF(f"skin_tone.{attribute[0]}") != 0)  # Use foe instead of F
        else:
            sub_patch_view = sub_patch_view.filter_labels('ground_truth', VF(f"{attribute[0]}") == attribute[1])  # Use foe instead of F
    return _compute_classification_recall(sub_patch_view, label_field)

In [None]:
concept = 'lawman'
attributes = {"perceived_gender_presentation": "masc"}

get_concept_attr_classification_recall(view, 'pred', concept, attributes)

In [None]:
view

In [None]:
dataset