In [17]:
import matplotlib.pyplot as plt
import numpy as np
import tqdm
from hsdfmpm.hsdfm.utils import find_cycles, gabor_filter_bank, leastsq_reflectance, mask_by_k_clustering
from hsdfmpm.utils import apply_kernel_bank
import pandas as pd
from hsdfmpm.hsdfm import HyperspectralImage, MergedHyperspectralImage
from pathlib import Path

from photon_canon.contrib.bio import hemoglobin_mus, fit_hemoglobin_model
from photon_canon.lut import LUT
from photon_canon.contrib.bio import wl, eps
import cv2
import imageio.v3 as iio
from datetime import datetime, timedelta
import warnings

# HSDFM Processing
1. Load images and metadata
2. Normalize to integration time
3. Normalize to standard and background
4. Perform least-squares fit
5. Apply Gabor filter
6. Apply adaptive threshold to acquire mask
7. Apply mask to hyperstack
8. Fit masked hyperstack with MCLUT

In [33]:
# Dir stuff
root = Path(r'E:\new df\POC Study')
processed = root /'Processed'
processed.mkdir(exist_ok=True)

# Find raw data
cycles = find_cycles(root / 'Animals')

# Load normalization data
standard_paths = find_cycles(root / 'Standards')
background = MergedHyperspectralImage(image_paths=find_cycles(root / 'Background' / 'dark_04282025'))
background.normalize()

[None, None, None]

In [34]:
def parse_categorical(cycle):
    # Prepare categorical data
    animal = cycle.parts[4]
    date = datetime.strptime(cycle.parts[5], '%m%d%Y')
    oxygen = cycle.parts[6]
    fov = cycle.parts[7]
    polarized = 'polar' in str(cycle) and not 'unp' in str(cycle)
    return animal, date, oxygen, fov, polarized

In [35]:
def find_dated_dir(date, paths):
    matches = []
    for path in paths:
        if date.strftime('%m%d%Y') in str(path):
            matches.append(path)
    if not matches:
        warnings.warn(f'No paths found for date {date}. Auto-incrementing ahead one day', category=RuntimeWarning, stacklevel=2)
        date += timedelta(days=1)
        return find_dated_dir(date, paths)
    return matches

In [36]:
lut = LUT(dimensions=['mu_s', 'mu_a'], scale=50000)
def model(wavelength, a, b, t, s):
    mu_s, mu_a, _ = hemoglobin_mus(a, b, t, s, wavelength)
    r = lut(mu_s, mu_a, extrapolate=True)
    return r

In [None]:
# Prep reusable parts
f = np.geomspace(0.01, 1, 16)
gabor_bank = gabor_filter_bank(frequency=f, sigma_x=4/f, sigma_y=1/f)

output = []

for cycle in tqdm.tqdm(cycles):
    # Parse categorical data
    animal, date, oxygen, fov, polarization = parse_categorical(cycle)

    # Load standard
    standard_path = find_dated_dir(date, standard_paths)
    if polarization:
        standard_path = [path for path in standard_path if 'polar' in str(path)]
    else:
        standard_path = [path for path in standard_path if 'polar' not in str(path)]
    standard = MergedHyperspectralImage(image_paths=find_dated_dir(date, standard_paths))
    standard.normalize()

    # Load cycle
    hs = HyperspectralImage(image_path=cycle, standard=standard, background=background)

    # Normalize (automatically normalizes to integration time then the standard/background
    hs.normalize()

    # Resize to 128x128
    hs.bin(bin_factor=2048 // 128)

    # Get variables for naive fit
    selected = np.isin(wl, hs.metadata['Wavelength'])
    e = np.log(10) * eps[:, selected] / 64500

    # Fit the image with naive lsq and apply gabor bank
    naive_fit = leastsq_reflectance(hs.image, e)
    hb_index = naive_fit[0] + naive_fit[1]
    gabor_response = apply_kernel_bank(hb_index, gabor_bank)

    # Create and mask
    mask = cv2.adaptiveThreshold((gabor_response / gabor_response.max() * 255).astype(np.uint8), 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 15, 0)
    hs.apply_mask(mask.astype(bool))

    # Fit image
    a, b, thb, so2 = fit_hemoglobin_model(model, hs.metadata['Wavelength'], hs.image, guess=[1, 1, 1, 1], bounds=[(0, 0, 0, 0), (np.inf, np.inf, np.inf, 1)])
    r_exp = np.zeros_like(hs.image)
    for i in range(hs.image.shape[1]):
        for j in range(hs.image.shape[2]):
            r_exp[:, i, j] = model(hs.metadata['Wavelength'], a[i, j], b[i, j], thb[i, j], so2[i, j])
    chi_sq = ( 1 / (np.count_nonzero(~np.isnan(hs), axis=0) - 4)) * np.nansum(((hs - r_exp) ** 2) / np.nanvar(hs, axis=0, keepdims=True), axis=0)

    # Update output
    output.append([animal, date, oxygen, fov, polarization,
                   np.mean(a[mask]), np.mean(b[mask]), np.mean(thb[mask]), np.mean(so2[mask]), str(cycle)
                   ])

    out_path = Path(animal, oxygen, fov, f'{'polarized' if polarization else 'nonpolarized'}')
    iio.imwrite(out_path / 'scatter_a.tiff', a)
    iio.imwrite(out_path / 'scatter_b.tiff', b)
    iio.imwrite(out_path / 'thb.tiff', thb)
    iio.imwrite(out_path / 'so2.tiff', so2)
    iio.imwrite(out_path / 'mask.tiff', mask)
    iio.imwrite(out_path / 'hsdfm_chi_sq.tiff', chi_sq)
    with open(out_path / 'hsdfm_img_model.json', 'w') as f:
        f.write(hs.model.dumps_json())

df = pd.DataFrame(output, columns=['Animal', 'Date', 'Oxygen', 'FOV', 'Polarization', 'Mean Scatter A', 'Mean Scatter B', 'Mean THb', 'Mean sO2', 'Full data path'])
df.to_csv(processed / 'hsdfm_output.csv')

  0%|          | 0/120 [00:00<?, ?it/s]

  0%|          | 0/192 [00:00<?, ?it/s]

  0%|          | 0/16384 [00:00<?, ?it/s]

