# Feature extraction
Converted from `01_feature_extraction.py`.

This notebook runs the feature extraction pipeline: loading mask files, computing embeddings, extracting image features, and computing CLIP-based text similarities.

## Imports

In [None]:
import os
import numpy as np
import pandas as pd
import pickle
from tqdm import tqdm
from PIL import Image
import warnings

import phenopype as pp
import phenopype_plugins as pp_plugins
import torch
from torch.nn.functional import normalize

# local utils
from utils import utils_python

## Prepare 

In [None]:
## set working directory
os.chdir(r"D:/git-repos/mluerig/nymphalid-phenomics/")

# optional: turn off phenopype verbosity
pp.config.verbose = False

# optional: suppress warnings (there will be a lot of them)
warnings.filterwarnings("ignore")

## check for cuda 
torch.cuda.device_count()

## Get files

In [None]:
input_dir = f"data_raw/segmentation_masks_clean/all_masks"

file_dict = {}
for species_name in os.listdir(input_dir):
    species_dir = os.path.join(input_dir, species_name)
    for file_name in os.listdir(species_dir):
        filepath_in = os.path.join(species_dir, file_name)
        genus_name = species_name.split("_")[0]
        file_dict[file_name] = {
            "genus_name" : genus_name,
            "species_name" : species_name,
            "mask_name" : file_name,
            "mask_path" : filepath_in
            }

data_imgs = pd.DataFrame.from_dict(file_dict, orient="index")
data_imgs = data_imgs.sort_values(by=['genus_name',"species_name","mask_name"], ascending=True)
data_imgs.reset_index(inplace=True, drop=True)


In [None]:
## file I/O
path_data_results = "data_raw/tables/embeddings.csv"

## set of processed images
dict_results = {}

#%% encoder setup
model, preprocessing = utils_python.setup_unicom("ViT-L/14@336px")

## Embeddings 


In [None]:
# move model to GPU if available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Running on device: {device}")

# pick an example row (change index as needed)
sample_idx = 0
sample_row = data_imgs.iloc[sample_idx]
print("Embedding:", sample_row['mask_name'])

## open and convert to tensor
img = Image.open(sample_row['mask_path']).convert('RGB')
tens = preprocessing(img)        # CPU tensor
tens = tens.unsqueeze(0).to(device)

## encode
with torch.no_grad():
    if device == 'cuda':
        with torch.amp.autocast(device_type='cuda'):
            out = model(tens)
    else:
        out = model(tens)

## normalize and detach from GPU
out = normalize(out)
embedding_vector = out.cpu().squeeze(0).detach().numpy()

In [None]:
# Configure
batch_size = 16 ## modify according to your GPU

# Select rows that still need processing
to_process = [row for _, row in data_imgs.iterrows() if row['mask_name'] not in dict_results]
if not to_process:
    print('Nothing to process (all images already embedded).')
else:
    total = len(to_process)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Running on device: {device}; batch_size={batch_size}")
    pbar = tqdm(total=total, desc='Embedding images', leave=True)

    for start in range(0, total, batch_size):
        batch_rows = to_process[start:start + batch_size]
        imgs = []
        names = []
        
        # load + preprocess on CPU
        for row in batch_rows:
            try:
                img = Image.open(row['mask_path']).convert('RGB')
                tens = preprocessing(img)
                imgs.append(tens)
                names.append(row['mask_name'])
            except Exception as e:
                print(f"Skipped {row['mask_path']}: {e}")
        if not imgs:
            pbar.update(len(batch_rows))
            continue
        
        ## load into GPU
        batch_tensor = torch.stack(imgs, dim=0).to(device)
        
        # forward
        with torch.no_grad():
            if device == 'cuda':
                with torch.cuda.amp.autocast():
                    out = model(batch_tensor)
            else:
                out = model(batch_tensor)
            out = normalize(out)
        out_np = out.cpu().detach().numpy()
        for nm, vec in zip(names, out_np):
            dict_results[nm] = vec

        ## progress update
        pbar.update(len(names))

# final save
data_results = pd.DataFrame.from_dict(dict_results, orient="index")
data_results.reset_index(inplace=True)
data_results.rename(columns={'index': 'mask_name'}, inplace=True)
data_results.to_csv(path_data_results, index=False)


In [None]:
dict_results

## Handcrafted features

In [None]:
## file I/O
path_data_results = "data_raw/tables/features.csv"

## set of processed images
dict_results = {}

In [None]:
save_intervall, skip_done = 10000, True
pbar = tqdm(total=len(data_imgs), position=0, leave=False, desc="Extracting features...")
for idx1, row in data_imgs.iterrows():
    if row["mask_name"] in dict_results.keys() and skip_done:
        pbar.update(1)
        continue
    else:
        image_BGRA = pp.load_image(row["mask_path"])    
        image, mask = image_BGRA[:,:,:3], image_BGRA[:,:,3]
        annotations = pp.segmentation.detect_contour(mask, keep="largest")
        
        ## blur image
        image_blurred = pp.core.preprocessing.blur(image, 3)
        image_blurred[mask == 0] = [0,0,0]
        
        # ## shape moments
        # annotations = pp.measurement.compute_shape_moments(annotations, features=["basic","hu_moments"])
        # shape = annotations["shape_features"]["a"]["data"]["shape_features"][0]rename_dict = 

        
        ## color moments
        annotations = pp.measurement.compute_color_moments(image_blurred, annotations)
        color_bgr = annotations["texture_features"]["a"]["data"]["texture_features"][0]
                
        image_lab = pp.core.preprocessing.decompose_image(image_blurred, col_space="lab")
        annotations = pp.measurement.compute_color_moments(image_lab, annotations, channel_names=["light", "grre", "blyl"])
        color_lab = annotations["texture_features"]["a"]["data"]["texture_features"][0]
                
        image_hls = pp.core.preprocessing.decompose_image(image_blurred, col_space="hls")
        annotations = pp.measurement.compute_color_moments(image_hls, annotations, channel_names=["hue", "lum", "sat"])
        color_hls = annotations["texture_features"]["a"]["data"]["texture_features"][0]
    
        ## color bins
        bins, clust = 5, 9
        image_lab = pp.core.preprocessing.decompose_image(image_blurred, col_space="lab")
        recolor_res_hist_lab = pp_plugins.measurement.recolorize_binclust(
            image_lab, mask, method="histogram", bins_per_channel=bins, blur=1)
        recolor_res_clust_lab = pp_plugins.measurement.recolorize_binclust(
            image_lab, method="kmeans", n_clusters=clust, blur=1)      
        hue_bins = utils_python.channel_to_bins(
            image_hls[:,:,0], mask=mask, blur=1, n_bins=36)

        analyses = {
            "hist_lab": recolor_res_hist_lab["pixel_assignments"],
            "clust_lab": recolor_res_clust_lab["pixel_assignments"],
            "split_hue": hue_bins
        }
        parsimony = {}
        for key, data in analyses.items():
            parsimony.update({f"color_{key}_{k}": v 
                              for k, v in utils_python.channel_parsimony(data, mask).items()})
              
        ## dfts 
        dft_stats = {}
        for chan in ["hue", "lum", "sat"]:
            dft_stats.update(pp.measurement.compute_DFT_stats(image_blurred, col_space="hls", channel=chan))
        for chan in ["light", "grre", "blyl"]:
            dft_stats.update(pp.measurement.compute_DFT_stats(image_blurred, col_space="lab", channel=chan))

        ## concat
        features = {**color_bgr, **color_hls, **color_lab, **parsimony, **dft_stats}
        pbar.update(1)
        dict_results[row["mask_name"]] = features
         
# final save
data_results = pd.DataFrame.from_dict(dict_results, orient="index")
data_results.reset_index(inplace=True)
data_results.rename(columns={'index': 'mask_name'}, inplace=True)
data_results.to_csv(path_data_results, index=False)