In [None]:
from pathlib import Path
import numpy as np
import pandas as pd
from skimage.filters import gaussian, laplace
from skimage.feature import hog, SIFT
from skimage.color import rgb2gray
from skimage.exposure import histogram
from scipy.signal import find_peaks
from scipy.stats import entropy

import torch
from torchvision import transforms, models

from PIL import Image
from tqdm.auto import tqdm

In [None]:
DATA_PATH = './data'
OUTPUT_DIR = './features'

# This variable will control whether to force extraction even if a CSV file exists
FORCE_EXTRACT = False

In [None]:
# Directory structure assumed is ./data/{dataset}/{class}/{replicate}/
base_directory = Path(DATA_PATH)

output_dir = Path(OUTPUT_DIR)
if not output_dir.exists():
    output_dir.mkdir()

## Helper functions for I/O

In [None]:
def save_to_csv(df: pd.DataFrame, csv_path: str) -> None:
    """
    Saves the given DataFrame into a CSV file after converting feature columns into individual feature columns.
    """
    new_df = {col:df[col] for col in df.columns if col not in ['hog', 'log', 'vgg', 'resnet']}
    hog_feats = np.array([feat for feat in df['hog']])
    log_feats = np.array([feat for feat in df['log']])
    vgg_feats = np.array([feat for feat in df['vgg']])
    resnet_feats = np.array([feat for feat in df['resnet']])
    
    
    for idx in range(256):
        new_df[f'hog_{idx}'] = hog_feats[:, idx]

    for idx in range(256):
        new_df[f'log_{idx}'] = log_feats[:, idx]

    for idx in range(256):
        new_df[f'vgg_{idx}'] = vgg_feats[:, idx]

    for idx in range(256):
        new_df[f'resnet_{idx}'] = resnet_feats[:, idx]

    new_df = pd.DataFrame(new_df)

    new_df.to_csv(csv_path, index=False)

def read_convert_csv(csv_path: str) -> pd.DataFrame:
    """
    Reads a CSV file, converts individual feature columns back into original feature columns and returns the DataFrame.
    """
    df = pd.read_csv(csv_path)

    hog_cols = [f'hog_{idx}' for idx in range(256)]
    log_cols = [f'log_{idx}' for idx in range(256)]
    vgg_cols = [f'vgg_{idx}' for idx in range(256)]
    resnet_cols = [f'resnet_{idx}' for idx in range(256)]

    df['hog'] = [feat for feat in df[hog_cols].to_numpy()]
    df['log'] = [feat for feat in df[log_cols].to_numpy()]
    df['vgg'] = [feat for feat in df[vgg_cols].to_numpy()]
    df['resnet'] = [feat for feat in df[resnet_cols].to_numpy()]


    df = df.drop(hog_cols+log_cols+vgg_cols+resnet_cols, axis=1)

    return df

## Feature extractor class

In [None]:
class ComplexityFeatureExtractor:
    def __init__(self):
        """
        Initializes feature extractors and preprocessing tools.
        """

        # Initialize SIFT extractor
        self.sift_extractor = SIFT()
        
        # Setting up VGG16 and ResNet50 for feature extraction
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = torch.device("mps" if torch.backends.mps.is_available() else self.device)
        self.vgg_model = models.vgg16(weights='VGG16_Weights.DEFAULT').features.to(self.device).eval()
        resnet_model = models.resnet50(weights='ResNet50_Weights.DEFAULT').to(self.device)
        self.resnet_model = torch.nn.Sequential(*(list(resnet_model.children())[:-1])).eval()
        self.preprocess = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        

    def color_modes(self, img: np.array) -> int:
        """
        1. Determining the number of mean plus 2*sigma modes present in the color distribution.
        
        Compute the number of modes based on color mean and standard deviation.
        """
        color_mean = np.mean(img, axis=(0, 1))
        color_std = np.std(img, axis=(0, 1))
        upper_bound = color_mean + 2 * color_std
        lower_bound = color_mean - 2 * color_std
        return np.sum((img > lower_bound) & (img < upper_bound))

    def rank_unique_colors(self, img: np.array, delta: int = 10) -> int:
        """
        2. Rank the number of unique colors present in the image that differ by a minimum delta.
        """
        unique_colors, counts = np.unique(img.reshape(-1, 3), axis=0, return_counts=True)
        significant_colors = [color for color, count in zip(unique_colors, counts) if count > delta]
        return len(significant_colors)

    def spectral_peaks(self, img: np.array) -> int:
        """
        3. Compute the Spectral distribution on each axis and compute the number of mean plus 2 * sigma spectral peaks.
        Calculate spectral peaks using Fourier Transform.
        """
        # Convert to grayscale
        gray = rgb2gray(img)
        f = np.fft.fft2(gray)
        fshift = np.fft.fftshift(f)
        magnitude_spectrum = np.abs(fshift)
        mean_val = np.mean(magnitude_spectrum)
        std_val = np.std(magnitude_spectrum)
        peaks, _ = find_peaks(magnitude_spectrum.flatten(), height=(mean_val + 2 * std_val))
        return len(peaks)

    def pixel_intensity_entropy(self, img: np.array) -> float:
        """
        Calculate the entropy of pixel intensity for a grayscale image.
        """
        
        # Flatten the image and compute histogram
        hist = self.compute_histogram(img.flatten())
        
        # Compute entropy
        e = entropy(hist)
        return e

    def log_features(self, img: np.array, sigma: float = 1.0) -> np.array:
        """
        6. LoG filters
        Compute the Laplacian of Gaussian for an image.
        """
        gray = rgb2gray(img)
        # Apply Gaussian blur
        blurred = gaussian(gray, sigma=sigma)
        # Compute Laplacian
        log = laplace(blurred)
        return log

    def hog_features(self, img: np.array) -> tuple:
        """
        4. HoG
        Extract Histogram of Oriented Gradients features.
        """
        gray = rgb2gray(img)
        features, hog_img = hog(gray, pixels_per_cell=(8, 8), cells_per_block=(2, 2), visualize=True)
        return features, hog_img
        
    def sift_features(self, img: np.array) -> tuple:
        """
        5. SIFT
        Extract SIFT features.
        """
        gray = rgb2gray(img)
        self.sift_extractor.detect_and_extract(gray)
        keypoints = self.sift_extractor.keypoints
        descriptors = self.sift_extractor.descriptors
        return keypoints, descriptors

    def vgg16_features(self, img: np.array) -> np.array:
        """
        7. VGG16
        Extract VGG16 features.
        """
        img = Image.fromarray(img)
        img_tensor = self.preprocess(img).unsqueeze(0).to(self.device)
        with torch.no_grad():
            features = self.vgg_model(img_tensor)
        return features.cpu().numpy().flatten()
        
    def resnet_features(self, img: np.array) -> np.array:
            """
            Extract features using ResNet50.
            """
            
            img = Image.fromarray(img)
            img_tensor = self.preprocess(img).unsqueeze(0).to(self.device)
            with torch.no_grad():
                features = self.resnet_model(img_tensor)
            return features.cpu().numpy().flatten()

    @classmethod
    def feat2hist(cls, arr: np.array) -> np.array:
        """
        Convert a feature array to a histogram.
        """
        normalized_arr = cls.normalize_array(arr)
        hist = cls.compute_histogram(normalized_arr.astype(np.uint8))
        return hist

    @staticmethod
    def compute_histogram(arr: np.array, bins: int = 256) -> np.array:
        """
        Compute histogram of an array.
        """
        hist, centers = histogram(arr.flatten(), nbins=bins, source_range='dtype')
        assert (centers == np.arange(256)).all()
        return hist

    @staticmethod
    def normalize_array(arr: np.array, new_min: float = 0, new_max: float = 255) -> np.array:
        """
        Normalize an array to a new specified min and max value.
        """
        old_min, old_max = np.min(arr), np.max(arr)
        normalized_arr = (arr - old_min) / (old_max - old_min) * (new_max - new_min) + new_min
        return normalized_arr

    def extract_all_features(self, img_path: str) -> pd.Series:
        """
        Extract all features and return them as a row for a pandas DataFrame.
        """
        # Read the image
        img = np.array(Image.open(img_path))

        # Extract features
        color_modes_val = self.color_modes(img)
        rank_colors = self.rank_unique_colors(img)
        spectral = self.spectral_peaks(img)
        sift_keypoints, sift_descriptors = self.sift_features(img)
        pixel_entropy = self.pixel_intensity_entropy(img)
        hog_feats, hog_img = self.hog_features(img)
        hog_hist = self.feat2hist(hog_feats)
        log_feats = self.log_features(img)
        log_hist = self.feat2hist(log_feats)
        vgg_feats = self.vgg16_features(img)
        vgg_hist = self.feat2hist(vgg_feats)
        resnet_feats = self.resnet_features(img)
        resnet_hist = self.feat2hist(resnet_feats)
        
        # Collate features into a dictionary
        data = {
            "filename": img_path.name,
            "dataset": img_path.parent.parent.parent.name,
            "class": img_path.parent.parent.name,
            "replicate": img_path.parent.name,            
            "color_modes": color_modes_val,
            "rank_colors": rank_colors,
            "spectral": spectral,
            "sift": len(sift_keypoints),
            "pixel_entropy": pixel_entropy,
            "log": log_hist,
            "hog": hog_hist,
            "vgg": vgg_hist,
            "resnet":resnet_hist
        }
        
        return pd.Series(data)





## Extract features

In [None]:
extractor = ComplexityFeatureExtractor()

# Get list of datasets
datasets = sorted([ds for ds in base_directory.iterdir() if ds.is_dir()])
for dataset in tqdm(datasets, desc="Datasets", position=0, leave=True):
    # Iterate over classes within each dataset
    classes = sorted([g for g in dataset.iterdir() if g.is_dir()])
    
    for class_ in tqdm(classes, desc=f"Classes in {dataset.name}", position=1, leave=False):
        # Iterate over replicates within each class
        replicates = sorted([rep for rep in class_.iterdir() if rep.is_dir()])
        
        for replicate in tqdm(replicates, desc=f"Replicates in {class_.name}", position=2, leave=False):
            
            # If the CSV file already exists and FORCE_EXTRACT is False, skip this replicate
            if Path(csv_filename).exists() and not FORCE_EXTRACT:
                # print(f"Skipping extraction for {csv_filename} as it already exists.")
                continue
            

            img_list = list(replicate.glob('*.png'))
            if len(img_list) == 0:
                continue

            csv_filename = output_dir.joinpath(dataset.name, class_.name, f"{replicate.name}.csv")
            if not csv_filename.parent.exists():
                csv_filename.parent.mkdir(parents=True)
                
            all_feats = []
            for img_path in tqdm(img_list, desc=f"Extracting from {replicate.name}", position=3, leave=False):
                features_series = extractor.extract_all_features(img_path)
                all_feats.append(features_series)



            df = pd.concat(all_feats, axis=1).T
            df['hog_entropy'] = df['hog'].apply(entropy)
            df['log_entropy'] = df['log'].apply(entropy)
            df['vgg_entropy'] = df['vgg'].apply(entropy)
            df['resnet_entropy'] = df['resnet'].apply(entropy)

            # Save using the desired naming format
            save_to_csv(df, csv_filename)

## Combine csv files into one & validate

In [None]:
rename_columns = {
    'color_modes': 'Color distribution modes',
    'rank_colors': 'Rank #unique colors',
    'spectral': 'Spectral distribution modes',
    'sift': 'SIFT',
    'pixel_entropy': 'Pixel entropy',
    'hog_entropy': 'HoG',
    'log_entropy': 'LoG',
    'vgg_entropy': 'VGG16',
    'resnet_entropy': 'ResNet50',
}

In [None]:
# Read all csv files
csv_files = []
for csv_file in output_dir.rglob('*.csv'):
    csv_files.append(read_convert_csv(csv_file))

# Concatenate csv files
df_final = pd.concat(csv_files)

# Rename columns
df_final = df_final.rename(rename_columns, axis=1)

# Sort based on dataset, class and replicate
df_final = df_final.sort_values(['dataset', 'class', 'replicate'], axis=0)

df_final

In [None]:
# Check that we processed all png files.
len(list(base_directory.rglob('*.png'))),len(df_final)

In [None]:
save_to_csv(df_final, output_dir.parent.joinpath('all_features.csv'))

In [None]:
# Validate that we can save and load the csv file correctly.
df_validate = read_convert_csv(output_dir.parent.joinpath('all_features.csv'))
df_validate

In [None]:
df_validate.drop(['hog', 'log', 'vgg', 'resnet'], axis=1).to_csv(output_dir.parent.joinpath('all_features_truncated.csv'), index=False)

In [None]:
df_validate_truncated = pd.read_csv(output_dir.parent.joinpath('all_features_truncated.csv'))
df_validate_truncated

In [None]:
df_validate_truncated.columns