In [None]:
# LOCAL = 1 indicates running this notebook locally, 0 indicates running it on Kaggle
LOCAL = 1

import os
if LOCAL != 1:
  GITHUB_USER = "magnusdtd"
  REPO_NAME = "ENTRep"
  BRANCH_NAME = "BioCLIP"

  from kaggle_secrets import UserSecretsClient
  user_secrets = UserSecretsClient()
  GITHUB_TOKEN = user_secrets.get_secret("GITHUB_TOKEN")

  os.system(f"git clone --single-branch --branch {BRANCH_NAME} https://{GITHUB_USER}:{GITHUB_TOKEN}@github.com/{GITHUB_USER}/{REPO_NAME}.git")
  os.chdir("/kaggle/working/")

  from ENTRep.utils.file import File
  File.make_train_path()
else:
  os.chdir("..")

current_path = os.getcwd()
print("Current path:", current_path)

In [None]:
!pip install open_clip_torch

# Evaluation

In [None]:
from BioCLIP.make_submission import make_submission_cls_task, make_submission_t2i_task
from BioCLIP.evaluator import ImageToTextEvaluator, TextToImageEvaluator
from BioCLIP.data_preparation import DataPreparation
import pandas as pd

In [None]:
from PIL import Image
import pandas as pd
import torch
import open_clip
import os

class ImageToTextEvaluator:
  def __init__(
    self, 
    df:pd.DataFrame, 
    labels: dict[str], 
    model_name:str, 
    model_path:str,
    path_column:str,
    caption_column:str
  ):
    '''
    The df contains paths to images at column 'Path'.
    This evaluator also known as classification evaluator.
    '''
    self.df = df
    self.model_name = model_name
    self.model_path = model_path
    self.labels = labels
    self.path_column = path_column
    self.caption_column = caption_column

    self.device = "cuda" if torch.cuda.is_available() else "cpu"

    if self.model_path:
      self.model, _, self.preprocess_val = open_clip.create_model_and_transforms(self.model_name, pretrained=self.model_path)
    else:
      self.model, _, self.preprocess_val = open_clip.create_model_and_transforms(self.model_name)
    self.model.to(self.device)
    self.model.eval()
    self.tokenizer = open_clip.get_tokenizer(model_name)
  
  def get_accuracy(self):
    correct_predictions = 0
    total_predictions = 0

    for _, row in self.df.iterrows():
      image_path = row[self.path_column]
      image_tensor = self.preprocess_val(Image.open(image_path)).unsqueeze(0).to(self.device)
      text_tokens = self.tokenizer(self.labels).to(self.device)

      with torch.no_grad():
        image_features = self.model.encode_image(image_tensor)
        text_features = self.model.encode_text(text_tokens)

      image_features /= image_features.norm(dim=-1, keepdim=True)
      text_features /= text_features.norm(dim=-1, keepdim=True)

      text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)
      predicted_label_idx = text_probs.argmax(dim=-1).item()
      predicted_label = self.labels[predicted_label_idx]

      if predicted_label == row[self.caption_column]:
        correct_predictions += 1

      total_predictions += 1

    accuracy = correct_predictions / total_predictions
    return accuracy

  def get_precision(self):
    true_positives = 0
    false_positives = 0

    for _, row in self.df.iterrows():
      image_path = row[self.path_column]
      image_tensor = self.preprocess_val(Image.open(image_path)).unsqueeze(0).to(self.device)
      text_tokens = self.tokenizer(self.labels).to(self.device)

      with torch.no_grad():
        image_features = self.model.encode_image(image_tensor)
        text_features = self.model.encode_text(text_tokens)

      image_features /= image_features.norm(dim=-1, keepdim=True)
      text_features /= text_features.norm(dim=-1, keepdim=True)

      text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)
      predicted_label_idx = text_probs.argmax(dim=-1).item()
      predicted_label = self.labels[predicted_label_idx]

      if predicted_label == row[self.caption_column]:
        true_positives += 1
      elif predicted_label != row[self.caption_column]:
        false_positives += 1

    precision = true_positives / (true_positives + false_positives)
    return precision

  def get_recall(self):
    true_positives = 0
    false_negatives = 0

    for _, row in self.df.iterrows():
      image_path = row[self.path_column]
      image_tensor = self.preprocess_val(Image.open(image_path)).unsqueeze(0).to(self.device)
      text_tokens = self.tokenizer(self.labels).to(self.device)

      with torch.no_grad():
        image_features = self.model.encode_image(image_tensor)
        text_features = self.model.encode_text(text_tokens)

      image_features /= image_features.norm(dim=-1, keepdim=True)
      text_features /= text_features.norm(dim=-1, keepdim=True)

      text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)
      predicted_label_idx = text_probs.argmax(dim=-1).item()
      predicted_label = self.labels[predicted_label_idx]

      if predicted_label == row[self.caption_column]:
        true_positives += 1
      elif predicted_label != row[self.caption_column]:
        false_negatives += 1

    recall = true_positives / (true_positives + false_negatives)
    return recall

  def get_f1_score(self):
    precision = self.get_precision()
    recall = self.get_recall()
    f1_score = 2 * (precision * recall) / (precision + recall)
    return precision, recall, f1_score


class TextToImageEvaluator:
  def __init__(
      self, 
      df: pd.DataFrame, 
      queries: dict[str], 
      model_name: str, 
      model_path: str,
      path_column: str,
      caption_column: str
    ):
    '''
    The df contains paths to images at column 'Path'.
    '''
    self.df = df
    self.model_name = model_name
    self.model_path = model_path
    self.queries = queries
    self.path_column = path_column
    self.caption_column = caption_column
    self.device = "cuda" if torch.cuda.is_available() else "cpu"

    if self.model_path:
      self.model, _, self.preprocess_val = open_clip.create_model_and_transforms(self.model_name, pretrained=self.model_path)
    else:
      self.model, _, self.preprocess_val = open_clip.create_model_and_transforms(self.model_name)
    self.model.to(self.device)
    self.model.eval()
    self.tokenizer = open_clip.get_tokenizer(self.model_name)
    self.query_tokens = self.tokenizer(queries).to(self.device)

    self.image_features_dict = {}
    self._feature_extract()

  def _feature_extract(self):
    for _, row in self.df.iterrows():
      image_path = row[self.path_column]
      image_name = os.path.basename(image_path)
      image_tensor = self.preprocess_val(Image.open(image_path)).unsqueeze(0).to(self.device)

      with torch.no_grad():
        image_features = self.model.encode_image(image_tensor)

      image_features /= image_features.norm(dim=-1, keepdim=True)
      self.image_features_dict[image_name] = image_features

  def get_recall_at_k(self, k: int):
    num_queries_with_correct_image_in_top_k = 0
    total_num_queries = len(self.queries)

    # Debug
    top_k_dict = {}

    with torch.no_grad():
      text_features = self.model.encode_text(self.query_tokens)
      text_features /= text_features.norm(dim=-1, keepdim=True)
      text_features = text_features.float()
      for i, query in enumerate(self.queries):
        similarities = {}
        for image_name, image_features in self.image_features_dict.items():
          similarity = (100.0 * text_features[i] @ image_features.T).item()
          similarities[image_name] = similarity

        # Sort images by similarity and get top-k
        sorted_images = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
        top_k_images = [image[0] for image in sorted_images[:k]]

        # Check if the correct image is in the top-k images
        if query in top_k_images:
          num_queries_with_correct_image_in_top_k += 1

        top_k_dict[query] = top_k_images

    for query, top_k_images in top_k_dict.items():
      print(f"{query}, {top_k_images}")

    recall_at_k = num_queries_with_correct_image_in_top_k / total_num_queries
    return recall_at_k


In [None]:
data_preparation = DataPreparation()
image_to_text_df = data_preparation.preprocess_data()
image_to_text_df = data_preparation.detect_and_translate(image_to_text_df)
data_preparation.validate_dataframe(image_to_text_df)
image_to_text_df['Path'] = image_to_text_df['Path'].apply(lambda x: os.path.join("/kaggle/working/", x))
image_to_text_df.head()
queries = image_to_text_df['DescriptionEN'].to_list()

text_to_image_evaluator = TextToImageEvaluator(
    df=image_to_text_df,
    queries=queries,
    model_name='hf-hub:magnusdtd/bio-clip-cls-ft',
    model_path='',
    path_column='Path',
    caption_column='DescriptionEN'
)

# Evaluate recall at k
recall_at_k = text_to_image_evaluator.get_recall_at_k(k=10)
print(f"Recall at k: {recall_at_k}")

In [None]:
image_to_text_df = pd.read_json('Dataset/train/cls.json', orient='index')
image_to_text_df = image_to_text_df.reset_index()
image_to_text_df.columns = ['Path', 'Ground Truth Label']
image_to_text_df['Path'] = image_to_text_df['Path'].apply(lambda x: os.path.join("/kaggle/working/Dataset/train/imgs", x))
image_to_text_df

labels = [
    "nose-right", 
    "nose-left" , 
    "ear-right" , 
    "ear-left"  , 
    "vc-open"   , 
    "vc-closed" , 
    "throat"    , 
]

image_to_text_evaluator = ImageToTextEvaluator(
    df=image_to_text_df,
    labels=labels,
    model_name='hf-hub:magnusdtd/bio-clip-cls-ft',
    model_path='',
    path_column='Path',
    caption_column='Ground Truth Label'
)

accuracy = image_to_text_evaluator.get_accuracy()
print(f"Accuracy: {accuracy}")

precision, recall, f1_score = image_to_text_evaluator.get_f1_score()
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")

# Make submissions

In [None]:
import torch
from PIL import Image
import open_clip
import os
import json
import zipfile
import pandas as pd
import datetime

def make_submission_cls_task(
    model_name: str, 
    model_path:str, 
    test_file_path: str, 
    labels_map:dict[str, int], 
    output_folder_path: str = './results'
  ):
  device = "cuda" if torch.cuda.is_available() else "cpu"
  test_df = pd.read_csv(test_file_path, header=None, names=['Path'])

  # Load BioCLIP model and tokenizer
  if model_path:
    model, _, preprocess_val = open_clip.create_model_and_transforms(model_name, pretrained=model_path)
  else:
    model, _, preprocess_val = open_clip.create_model_and_transforms(model_name)
  model.to(device)
  model.eval()
  tokenizer = open_clip.get_tokenizer(model_name)

  label_names = list(labels_map.keys())
  predictions = {}
  for img_name in test_df['Path']:
    image_path = os.path.join('Dataset/test/imgs', img_name)
    image_tensor = preprocess_val(Image.open(image_path)).unsqueeze(0).to(device)
    text = tokenizer(label_names).to(device)

    with torch.no_grad():
      image_features = model.encode_image(image_tensor)
      text_features = model.encode_text(text)

    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)
    best_match_idx = text_probs.argmax(dim=-1)
    predictions[img_name] = labels_map[label_names[best_match_idx.item()]]

  # Generate unique JSON filename with model_name as prefix
  daytime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
  json_file_name = f'BioCLIP_cls_{daytime}.json'
  json_file_path = os.path.join(output_folder_path, json_file_name)

  # Save predictions to JSON file
  with open(json_file_path, 'w') as json_file:
    json.dump(predictions, json_file)

  # Create ZIP archive with the same name as the JSON file
  zip_file_path = os.path.join(output_folder_path, f'BioCLIP_cls_{daytime}.zip')
  with zipfile.ZipFile(zip_file_path, 'w') as zip_file:
    zip_file.write(json_file_path, arcname=json_file_name)

  print(f"Submission file created at: {zip_file_path}")

def make_submission_t2i_task(
    model_name: str, 
    model_path: str, 
    test_file_path: list, 
    image_folder_path: str, 
    output_folder_path: str = './results'
  ):
  test_df = pd.read_csv(test_file_path, header=None, names=['Query'])

  device = "cuda" if torch.cuda.is_available() else "cpu"
  if model_path:
    model, _, preprocess_val = open_clip.create_model_and_transforms(model_name, pretrained=model_path)
  else:
    model, _, preprocess_val = open_clip.create_model_and_transforms(model_name)
  model.to(device)
  model.eval()
  tokenizer = open_clip.get_tokenizer(model_name)

  # Preprocess text queries
  text_tokens = tokenizer(test_df['Query'].to_list()).to(device)

  # Extract image features
  image_features_dict = {}
  for image_name in os.listdir(image_folder_path):
    image_path = os.path.join(image_folder_path, image_name)
    image_tensor = preprocess_val(Image.open(image_path)).unsqueeze(0).to(device)

    with torch.no_grad():
      image_features = model.encode_image(image_tensor)

    image_features /= image_features.norm(dim=-1, keepdim=True)
    image_features_dict[image_name] = image_features

  # Match text queries to images
  predictions = {}
  with torch.no_grad():
    text_features = model.encode_text(text_tokens)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    for i, text_query in enumerate(test_df['Query']):
      similarities = {}
      for image_name, image_features in image_features_dict.items():
        similarity = (100.0 * text_features[i] @ image_features.T).item()
        similarities[image_name] = similarity

      best_match_image = max(similarities, key=similarities.get)
      predictions[text_query] = best_match_image

  # Generate unique JSON filename with model_name as prefix
  daytime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
  json_file_name = f'BioCLIP_t2i_{daytime}.json'
  json_file_path = os.path.join(output_folder_path, json_file_name)

  # Save predictions to JSON file
  with open(json_file_path, 'w') as json_file:
    json.dump(predictions, json_file)

  # Create ZIP archive with the same name as the JSON file
  zip_file_path = os.path.join(output_folder_path, f'BioCLIP_t2i_{daytime}.zip')
  with zipfile.ZipFile(zip_file_path, 'w') as zip_file:
    zip_file.write(json_file_path, arcname=json_file_name)

  print(f"Submission file created at: {zip_file_path}")


In [None]:
make_submission_cls_task(
  model_name="hf-hub:magnusdtd/bio-clip-cls-ft",
  model_path="",
  test_file_path="Dataset/test/cls.csv",
  labels_map = {
    "nose-right": 0, 
    "nose-left" : 1, 
    "ear-right" : 2, 
    "ear-left"  : 3, 
    "vc-open"   : 4, 
    "vc-closed" : 5, 
    "throat"    : 6, 
  }
)

In [None]:
make_submission_t2i_task(
  model_name="hf-hub:magnusdtd/bio-clip-cls-ft",
  model_path="",
  test_file_path="Dataset/test/t2i.csv",
  image_folder_path="Dataset/test/imgs"
)