In [None]:
# default_exp utils

# Utils

> API details.

## Imports

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
import os
import re
from functools import partial, wraps
from glob import glob
from typing import List

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr
from dask import delayed
from dask_image.imread import imread
from matplotlib import pyplot as plt
from scipy.stats import mode
from skimage import measure, segmentation

In [None]:
#export
#slow
import pyclesperanto_prototype as cle

## Preparing xarray DataArray of images from file path globs

In [None]:
#export
def clean_img_names(img_path_glob: str, img_name_regex: str):
    """clean_img_names takes a "globbed" string pattern, searches
    for all files that match the pattern and extracts image names
    from each file using a regular expression."""
    return [
        re.findall(img_name_regex, os.path.basename(fn))[0]
        for fn in sorted(glob(img_path_glob))
    ]

An example of clean_img_names:

In [None]:
clean_img_names("docs/fe*", r"feed")

['feed']

In [None]:
assert clean_img_names("docs/fe*", r"feed") == ["feed"]

In [None]:
#export
def check_lists_identical(list_of_lists):
    list_a = list_of_lists[0]

    for l in list_of_lists:
        if np.array_equal(l, list_a):
            continue
        else:
            raise ValueError("not all lists have same length!")

In [None]:
#export
def img_path_to_xarr(img_name_regex, pixel_size=0.275, **channel_path_globs):
    imgs = list()
    channels = list()
    img_names = list()

    for channel_name, img_path_glob in channel_path_globs.items():
        channels.append(channel_name)
        imgs.append(imread(img_path_glob))
        img_names.append(clean_img_names(img_path_glob, img_name_regex))

    check_lists_identical(img_names)
    return xr.DataArray(
        data=da.stack(imgs),
        coords=[
            channels,
            img_names[0],
            np.arange(0, imgs[0].shape[1] * pixel_size, pixel_size),
            np.arange(0, imgs[0].shape[2] * pixel_size, pixel_size),
        ],
        dims=["channel", "img_name", "y", "x"],
    )

In [None]:
#export
def last2dims(f):
    def func(array):
        return f(array[0, 0, ...])[None, None, ...]

    return func

## Helper functions for regionprops

In [None]:
#export
def extend_region_properties_list(extra_properties: list = None):
    properties = ["label", "area", "mean_intensity", "centroid"]
    if extra_properties is None:
        pass
    else:
        try:
            properties = properties + extra_properties
        except TypeError:
            raise TypeError("extra_properties must be a list")
        except Exception as e:
            raise e

    return properties

In [None]:
#export
def add_scale_regionprops_table_area_measurements(df, pixel_size):
    df_with_um2 = (df.filter(regex=r"area") * (pixel_size ** 2)).add_suffix("_um2")
    return pd.concat([df, df_with_um2], axis=1)

In [None]:
#export
@delayed
def lazy_props(seg, img, seg_ch, img_ch, seg_name, img_name, properties, **kwargs):
    df = pd.DataFrame(
        measure.regionprops_table(seg, img, properties=properties, **kwargs)
    )
    df["seg_channel"] = seg_ch
    df["intensity_img_channel"] = img_ch
    df["segmentation_img_name"] = seg_name
    df["intensity_img_name"] = img_name
    return df