In [None]:
!pip install osfclient --quiet
!pip install plotnine --quiet


In [None]:
import os
from google.colab import userdata
import numpy as np
import pandas as pd
import tensorflow as tf
from tqdm import tqdm
import cv2
import matplotlib.pyplot as plt
import argparse

import osfclient

import et_util.dataset_utils as dataset_utils
from et_util.dataset_utils import parse_single_eye_tfrecord as parse, rescale_coords_map


In [None]:
os.environ['OSF_TOKEN'] = userdata.get('osftoken')
os.environ['OSF_USERNAME'] = userdata.get('osfusername')

In [None]:
!osf -p uf2sh fetch single_eye_tfrecords.tar.gz

In [None]:
!mkdir single_eye_tfrecords
!tar -xf single_eye_tfrecords.tar.gz -C single_eye_tfrecords

In [None]:
from et_util.dataset_utils import parse_single_eye_tfrecord as parse, rescale_coords_map


In [None]:
test_data, _, _ = dataset_utils.process_tfr_to_tfds(
    'single_eye_tfrecords/',
    parse,
    train_split=1.0,
    val_split=0.0,
    test_split=0.0,
    random_seed=12604,
    group_function=lambda img, phase, coords, subject_id: subject_id
)

In [None]:
class ImageQualityMetrics:
    """Class to calculate various image quality metrics"""

    @staticmethod
    def to_numpy(img_tensor):
        """Convert TensorFlow tensor to numpy array"""
        if isinstance(img_tensor, tf.Tensor):
            img = img_tensor.numpy()
        else:
            img = img_tensor
        # Ensure grayscale images have proper dimensions
        if len(img.shape) == 2:
            img = np.expand_dims(img, axis=-1)
        return img

    @staticmethod
    def brightness(img):
        """Calculate mean brightness of the image"""
        img = ImageQualityMetrics.to_numpy(img)
        return float(np.mean(img))

    @staticmethod
    def contrast(img):
        """Calculate contrast as standard deviation of pixel values"""
        img = ImageQualityMetrics.to_numpy(img)
        return float(np.std(img))

    @staticmethod
    def entropy(img):
        """Calculate image entropy (information content)"""
        img = ImageQualityMetrics.to_numpy(img)
        img_flat = img.flatten()
        hist, _ = np.histogram(img_flat, bins=256, range=(0, 255), density=True)
        hist = hist[hist > 0]  # Remove zero counts
        return float(-np.sum(hist * np.log2(hist)))

    @staticmethod
    def laplacian_variance(img):
        """Calculate variance of the Laplacian (measure of focus/sharpness)"""
        img = ImageQualityMetrics.to_numpy(img)
        if img.shape[-1] == 3:
            img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        else:
            img_gray = img.squeeze()
        return float(cv2.Laplacian(img_gray, cv2.CV_64F).var())

    @staticmethod
    def gradient_magnitude(img):
        """Calculate mean gradient magnitude (edge strength)"""
        img = ImageQualityMetrics.to_numpy(img)
        if img.shape[-1] == 3:
            img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        else:
            img_gray = img.squeeze()
        sobelx = cv2.Sobel(img_gray, cv2.CV_64F, 1, 0, ksize=3)
        sobely = cv2.Sobel(img_gray, cv2.CV_64F, 0, 1, ksize=3)
        gradient_magnitude = np.sqrt(sobelx**2 + sobely**2)
        return float(np.mean(gradient_magnitude))

    @staticmethod
    def blur_detection(img):
        """Just Noticeable Blur (JNB) measure - higher values indicate less blur"""
        img = ImageQualityMetrics.to_numpy(img)
        if img.shape[-1] == 3:
            img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        else:
            img_gray = img.squeeze()

        # Apply Laplacian filter
        laplacian = cv2.Laplacian(img_gray, cv2.CV_64F)

        # Calculate mean and standard deviation of Laplacian
        mean, std = cv2.meanStdDev(laplacian)

        # Calculate normalized blur measure (higher value = less blur)
        blur_measure = float(std[0][0]**2)

        return blur_measure

    @staticmethod
    def noise_estimation(img):
        """Estimate image noise level"""
        img = ImageQualityMetrics.to_numpy(img)
        if img.shape[-1] == 3:
            img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        else:
            img_gray = img.squeeze()

        # Apply median filter (noise reduction)
        median_filtered = cv2.medianBlur(img_gray.astype(np.uint8), 3)

        # Calculate noise as difference between original and filtered
        noise = np.abs(img_gray - median_filtered)

        return float(np.mean(noise))

    @staticmethod
    def compute_all_metrics(img):
        """Compute all image quality metrics and return as dictionary"""
        metrics = {
            'brightness': ImageQualityMetrics.brightness(img),
            'contrast': ImageQualityMetrics.contrast(img),
            'entropy': ImageQualityMetrics.entropy(img),
            'laplacian_variance': ImageQualityMetrics.laplacian_variance(img),
            'gradient_magnitude': ImageQualityMetrics.gradient_magnitude(img),
            'blur_detection': ImageQualityMetrics.blur_detection(img),
            'noise_estimation': ImageQualityMetrics.noise_estimation(img),
        }
        return metrics

In [None]:
test_data.element_spec

In [None]:
results = []

for e in test_data:
  img, phase, coords, subject_id = e
  np_img = img.numpy()
  metrics = ImageQualityMetrics.compute_all_metrics(np_img)
  metrics.update({
      'subject_id': subject_id.numpy(),
      'phase': phase.numpy(),
      'coord_x': coords[0].numpy(),
      'coord_y': coords[1].numpy()
  })

  results.append(metrics)

In [None]:
df = pd.DataFrame(results)

In [None]:
df.to_csv('image-quality-metrics.csv', index=False)

In [None]:
!osf -p uf2sh upload image-quality-metrics.csv image-quality-metrics.csv

In [None]:
# summarize metrics by participant, with mean and std
# count number of phase 1 and phase 2 images for each

df_summary = df.groupby('subject_id').agg({
    'brightness': ['mean', 'std'],
    'contrast': ['mean', 'std'],
    'entropy': ['mean', 'std'],
    'laplacian_variance': ['mean', 'std'],
    'gradient_magnitude': ['mean', 'std'],
    'blur_detection': ['mean', 'std'],
    'noise_estimation': ['mean', 'std'],
    'phase': lambda x: x.value_counts().to_dict()  # Count phase occurrences
})

# Flatten MultiIndex columns for better readability
df_summary.columns = ['_'.join(col) for col in df_summary.columns]

# Rename 'phase' columns for clarity
df_summary = df_summary.rename(columns={
    'phase_<lambda>': 'phase_counts'
})

# Extract phase counts into separate columns
df_summary['phase_1_count'] = df_summary['phase_counts'].apply(lambda x: x.get(1, 0))
df_summary['phase_2_count'] = df_summary['phase_counts'].apply(lambda x: x.get(2, 0))

# Drop the original 'phase_counts' column
df_summary = df_summary.drop(columns=['phase_counts'])

df_summary


In [None]:
df_summary.to_csv('image-quality-metrics-subject-level.csv')

In [None]:
!osf -p uf2sh upload image-quality-metrics-subject-level.csv image-quality-metrics-subject-level.csv