In [1]:
# from napari_phasors._reader import *
# from napari_phasors._reader import _get_filename_extension, _parse_and_call_io_function
from natsort import natsorted
from pathlib import Path

In [2]:
path = r"D:\Datasets\FLIM\new_acquisition\hazelnut_3D_FLIM.sptw"
path2 = r"D:\Datasets\FLIM\new_acquisition\hazelnut_3D_FLIM_folder"

In [3]:
Path(path).is_dir()

True

flim-phasor-plotter-reader functions

In [19]:
def get_current_tz(file_path):
    """Get current time point and z slice from file name.

    Parameters
    ----------
    file_path : Path
        A Path object from pathlib. It expects a file name with '_t' and '_z' patterns.

    Returns
    -------
    current_t, current_z : Tuple(int, int)
        Current time point and z slice.
    """
    import re
    pattern_t = '_t(\\d+)'
    pattern_z = '_z(\\d+)'
    current_t, current_z = None, None
    file_name = file_path.stem
    matches_z = re.search(pattern_z, file_name)
    if matches_z is not None:
        current_z = int(matches_z.group(1))
        current_z -= 1 # 0-based index while file names convention are 1-based
    matches_t = re.search(pattern_t, file_name)
    if matches_t is not None:
        current_t = int(matches_t.group(1))
        current_t -= 1 # 0-based index while file names convention are 1-based
    return current_t, current_z 

def get_max_zslices(file_paths, file_extension):
    """Get max z slices.

    Parameters
    ----------
    file_paths : List of paths
        A list of Path objects from pathlib.
    file_extension : str
        A file extension, like '.tif' or '.ptu'.

    Returns
    -------
    max_z : int
        Max z slices.
    """
    max_z = max([get_current_tz(file_path)
                for file_path in file_paths if file_path.suffix == file_extension])[1]
    if max_z is None:
        return 0
    return max_z


def get_max_time_points(file_paths, file_extension):
    """Get max time points.

    Parameters
    ----------
    file_paths : List of paths
        A list of Path objects from pathlib.
    file_extension : str
        A file extension, like '.tif' or '.ptu'.

    Returns
    -------
    max_time : int
        Max time points.
    """
    max_time = max([get_current_tz(file_path)
                   for file_path in file_paths if file_path.suffix == file_extension])[0]
    if max_time is None:
        return 0
    return max_time

def get_structured_list_of_paths(file_paths, file_extension):
    """Get structured list of paths.

    Parameters
    ----------
    file_paths : List of paths
        A list of Path objects from pathlib.
    file_extension : str
        A file extension, like '.tif' or '.ptu'.

    Returns
    -------
    t_path_list : List of lists of paths
        A list of lists of Path objects from pathlib. The first list is the time points, the second list is the z slices.
    """
    from natsort import natsorted
    t_path_list = []
    z_path_list = []
    file_paths = natsorted(file_paths)
    previous_t = 0
    for file_path in file_paths:
        if file_path.suffix == file_extension:
            current_t, current_z = get_current_tz(file_path)
            if current_t is not None:
                if current_t > previous_t:
                    t_path_list.append(z_path_list)
                    z_path_list = []
                    previous_t = current_t
                z_path_list.append(file_path)
    # If no timepoints, z_path_list is file_paths
    if current_t is None:
        z_path_list = file_paths
    # Append last timepoint
    t_path_list.append(z_path_list)
    return t_path_list

def get_most_frequent_file_extension(path):
    """Get most frequent file extension in path.

    Parameters
    ----------
    path : str or list of str
        Path to file, or list of paths.

    Returns
    -------
    most_frequent_file_type : str
        Most frequent file extension in path.
    """
    from pathlib import Path
    # Check if path is a list of paths
    if isinstance(path, list):
        # reader plugins may be handed single path, or a list of paths.
        # if it is a list, it is assumed to be an image stack...
        # so we are going to look at the most common file extension/suffix.
        suffixes = [Path(p).suffix for p in path]
    # Path is a single string
    else:
        path = Path(path)
        # If directory
        if path.is_dir():
            if path.suffix == '.zarr':
                suffixes = ['.zarr']
            # Check if directory has suffix (meaning it can be .zarr)
            # if path.suffix != '':
            #     # Get path suffix
            #     suffixes = [path.suffix]
            # Get suffixes from files inside
            else:
                suffixes = [p.suffix for p in path.iterdir()]
        # Get file suffix
        elif path.is_file():
            suffixes = [path.suffix]
    # Get most frequent file entension in path
    most_frequent_file_type = max(set(suffixes), key=suffixes.count)
    return most_frequent_file_type

napari-phasors reader functions

In [None]:
"""
This module contains functions to read files supported by `phasorpy.io`
and computes phasor coordinates with `phasorpy.phasor.phasor_from_signal`

"""

import inspect
import os
from typing import Any, Callable, Optional, Sequence, Union

import numpy as np
import pandas as pd
import phasorpy.io as io
import tifffile
from napari.layers import Labels
from napari.utils.notifications import show_error
from phasorpy.phasor import phasor_from_signal

extension_mapping = {
    "raw": {
        ".ptu": lambda path, reader_options: _parse_and_call_io_function(
            path,
            io.read_ptu,
            {"frame": (-1, False), "keepdims": (True, False)},
            reader_options,
        ),
        ".fbd": lambda path, reader_options: _parse_and_call_io_function(
            path,
            io.read_fbd,
            {"frame": (-1, False), "keepdims": (True, False)},
            reader_options,
        ),
        ".sdt": lambda path, reader_options: _parse_and_call_io_function(
            path,
            io.read_sdt,
            {},
            reader_options,
        ),
        ".lsm": lambda path, reader_options: _parse_and_call_io_function(
            path,
            io.read_lsm,
            {},
            reader_options,
        ),
        ".tif": lambda path, reader_options: _parse_and_call_io_function(
            path,
            tifffile.imread,
            {},
            reader_options,
        ),
        # ".flif": lambda path: io.read_flif(path),
        # ".bh": lambda path: io.read_bh(path),
        # ".bhz": lambda path: io.read_bhz(path),
        # ".ifli": lambda path: io.read_ifli(),
    },
    "processed": {
        ".ome.tif": lambda path, reader_options: _parse_and_call_io_function(
            path, io.phasor_from_ometiff, {}, reader_options
        ),
        # ".b64": lambda path: io.read_b64(path),
        # ".r64": lambda path: io.read_r64(path),
        # ".ref": lambda path: io.read_ref(path)
    },
}
"""This dictionary contains the mapping for reader functions from
`phasorpy.io` supported formats.

Commented file extensions are not supported at the moment.

"""

iter_index_mapping = {
    ".ptu": "C",
    ".fbd": "C",
    ".lsm": None,
    ".tif": None,
    '.sdt': None,
}
"""This dictionary contains the mapping for the axis to iterate over
when calculating phasor coordinates in the file.
"""


def napari_get_reader(
    path: str,
    reader_options: Optional[dict] = None,
    harmonics: Union[int, Sequence[int], None] = None,
) -> Optional[Callable]:
    """Initial reader function to map file extension to
    specific reader functions.

    Parameters
    ----------
    path : str
        Path to file.
    reader_options : dict, optional
        Dictionary containing the arguments to pass to the function.
    harmonics : Union[int, Sequence[int], None], optional
        Harmonic(s) to be processed. Can be a single integer, a sequence of
        integers, or None. Default is None.

    Returns
    -------
    layer_data : list of tuples, or None
        A list of LayerData tuples where each tuple in the list contains a
        napari.layers.Labels layer a tuple  (data, kwargs), where data is
        the mean intensity image as an array, and kwargs is a a dict of
        keyword arguments for the corresponding viewer.add_* method in napari,
        which contains the 'name' of the layer as well as the 'metadata',
        which is also a dict. The values for key 'phasor_features_labels_layer'
        in 'metadata' contain phasor coordinates as columns 'G' and 'S'.

    """
    # if Path(path).is_dir():

    if path.endswith(tuple(extension_mapping["processed"].keys())):
        return lambda path: processed_file_reader(
            path, reader_options=reader_options, harmonics=harmonics
        )
    elif path.endswith(tuple(extension_mapping["raw"].keys())):
        return lambda path: raw_file_reader(
            path, reader_options=reader_options, harmonics=harmonics
        )
    else:
        show_error("File extension not supported.")


def raw_file_reader(
    path: str,
    reader_options: Optional[dict] = None,
    harmonics: Union[int, Sequence[int], None] = None,
) -> list[tuple]:
    """Read raw data files from supported file formats and apply the phasor
    transformation to get mean intensity image and phasor coordinates.

    Parameters
    ----------
    path : str
        Path to file.
    reader_options : dict, optional
        Dictionary containing the arguments to pass to the function.
    harmonics : Union[int, Sequence[int], None], optional
        Harmonic(s) to be processed. Can be a single integer, a sequence of
        integers, or None. Default is None.

    Returns
    -------
    layer_data : list of tuples
        A list of LayerData tuples where each tuple in the list contains a
        napari.layers.Labels layer a tuple  (data, kwargs), where data is
        the mean intensity image as an array, and kwargs is a a dict of
        keyword arguments for the corresponding viewer.add_* method in napari,
        which contains the 'name' of the layer as well as the 'metadata',
        which is also a dict. The values for key 'phasor_features_labels_layer'
        in 'metadata' contain phasor coordinates as columns 'G' and 'S'.

    """
    filename, file_extension = _get_filename_extension(path)
    raw_data = extension_mapping["raw"][file_extension](path, reader_options)
    layers = []
    iter_axis = iter_index_mapping[file_extension]
    if iter_axis is None:
        if file_extension == ".tif":
            mean_intensity_image, G_image, S_image = phasor_from_signal(
                raw_data, axis=0, harmonic=harmonics
            )
        elif file_extension == '.sdt':
            mean_intensity_image, G_image, S_image = phasor_from_signal(
                raw_data, axis=-1, harmonic=harmonics
            )
        else:
            # Calculate phasor over channels if file is of hyperspectral type
            mean_intensity_image, G_image, S_image = phasor_from_signal(
                raw_data, axis=raw_data.dims.index("C"), harmonic=harmonics
            )
        labels_layer = make_phasors_labels_layer(
            mean_intensity_image,
            G_image,
            S_image,
            name=filename,
            harmonics=harmonics,
        )
        add_kwargs = {
            "name": f"{filename} Intensity Image",
            "metadata": {
                "phasor_features_labels_layer": labels_layer,
                "original_mean": mean_intensity_image,
            },
        }
        layers.append((mean_intensity_image, add_kwargs))
    else:
        iter_axis_index = raw_data.dims.index(iter_axis)
        for channel in range(raw_data.shape[iter_axis_index]):
            # Calculate phasor over photon counts dimension if file is FLIM
            mean_intensity_image, G_image, S_image = phasor_from_signal(
                raw_data.sel(C=channel),
                axis=raw_data.sel(C=channel).dims.index("H"),
                harmonic=harmonics,
            )
            labels_layer = make_phasors_labels_layer(
                mean_intensity_image,
                G_image,
                S_image,
                name=filename,
                harmonics=harmonics,
            )
            add_kwargs = {
                "name": f"{filename} Intensity Image: Channel {channel}",
                "metadata": {
                    "phasor_features_labels_layer": labels_layer,
                    "original_mean": mean_intensity_image,
                },
            }
            layers.append((mean_intensity_image, add_kwargs))
    return layers


def processed_file_reader(
    path: str,
    reader_options: Optional[dict[str, str]] = None,
    harmonics: Union[int, Sequence[int], None] = None,
) -> list[tuple]:
    """Reader function for files that contain processed images, as phasor
    coordinates or intensity images.

    Parameters
    ----------
    path : str
        Path to file.
    reader_options : dict, optional
        Dictionary containing the arguments to pass to the function.
    harmonics : Union[int, Sequence[int], None], optional
        Harmonic(s) to be processed. Can be a single integer, a sequence of
        integers, or None. Default is None.

    Returns
    -------
    layer_data : list of tuples
        A list of LayerData tuples where each tuple in the list contains a
        napari.layers.Labels layer a tuple  (data, kwargs), where data is
        the mean intensity image as an array, and kwargs is a a dict of
        keyword arguments for the corresponding viewer.add_* method in napari,
        which contains the 'name' of the layer as well as the 'metadata',
        which is also a dict. The values for key 'phasor_features_labels_layer'
        in 'metadata' contain phasor coordinates as columns 'G' and 'S'.

    """
    filename, file_extension = _get_filename_extension(path)
    reader_options = reader_options or {'harmonic': harmonics}
    mean_intensity_image, G_image, S_image, attrs = extension_mapping[
        "processed"
    ][file_extension](path, reader_options)
    labels_layer = make_phasors_labels_layer(
        mean_intensity_image,
        G_image,
        S_image,
        name=filename,
        harmonics=harmonics,
    )
    layers = []
    add_kwargs = {
        "name": filename + " Intensity Image",
        "metadata": {
            "phasor_features_labels_layer": labels_layer,
            "original_mean": mean_intensity_image,
            "attrs": attrs,
        },
    }
    layers.append((mean_intensity_image, add_kwargs))
    return layers


def make_phasors_labels_layer(
    mean_intensity_image: Any,
    G_image: Any,
    S_image: Any,
    name: str = "",
    harmonics: Union[int, Sequence[int], None] = None,
) -> Labels:
    """Create a napari Labels layer from phasor coordinates.

    Parameters
    ----------
    mean_intensity_image : np.ndarray
        Mean intensity image.
    G_image : np.ndarray
        G phasor coordinates.
    S_image : np.ndarray
        S phasor coordinates.
    name : str, optional
        Name of the layer, by default ''.
    harmonics : Union[int, Sequence[int], None], optional
        Harmonic(s) to be processed. Can be a single integer, a sequence of
        integers, or None. Default is None.

    Returns
    -------
    labels_layer : napari.layers.Labels
        Labels layer with phasor coordinates as features.

    """
    pixel_id = np.arange(1, mean_intensity_image.size + 1)
    table = pd.DataFrame()
    if len(G_image.shape) > 2:
        for i in range(G_image.shape[0]):
            harmonic_value = harmonics[i] if harmonics is not None else i + 1
            sub_table = pd.DataFrame(
                {
                    "label": pixel_id,
                    "G_original": G_image[i].ravel(),
                    "S_original": S_image[i].ravel(),
                    "G": G_image[i].ravel(),
                    "S": S_image[i].ravel(),
                    "harmonic": harmonic_value,
                }
            )
            table = pd.concat([table, sub_table])
    else:
        if isinstance(harmonics, list):
            harmonic_value = harmonics[0]
        else:
            harmonic_value = harmonics if harmonics is not None else 1
        # Get pixel coordinates from G_image
        table = pd.DataFrame(
            {
                "label": pixel_id,
                "G_original": G_image.ravel(),
                "S_original": S_image.ravel(),
                "G": G_image.ravel(),
                "S": S_image.ravel(),
                "harmonic": harmonic_value,
            }
        )

    labels_data = pixel_id.reshape(mean_intensity_image.shape)
    labels_layer = Labels(
        labels_data,
        name=f"{name} Phasor Features Layer",
        scale=(1, 1),
        features=table,
    )
    return labels_layer


def _parse_and_call_io_function(
    path: str,
    func: Callable,
    args_defaults: dict[str, Any],
    reader_options: Optional[dict[str, Any]] = None,
) -> Any:
    """Private helper function to parse arguments and call a `io` function.

    Parameters
    ----------
    path : str
        Path to file.
    func : callable
        Function to call.
    args_defaults : dict
        Dictionary containing the default arguments for the function.
    reader_options : dict, optional
        Dictionary containing the arguments to pass to the function.
        Default is None.

    Returns
    -------
    data : xarray.DataArray
        Data read from the file

    """
    args = {}
    # Use reader_options if provided, otherwise use the default
    if reader_options is not None:
        for arg, value in reader_options.items():
            args[arg] = value

    # Fill in defaults for any missing arguments not provided in reader_options
    for arg, (default, is_required) in args_defaults.items():
        if arg not in args:
            if is_required:
                raise ValueError(f"Required argument '{arg}' is missing.")
            args[arg] = default

    # Validate arguments against the function's signature
    valid_args = {}
    sig = inspect.signature(func)
    for arg, value in args.items():
        if arg in sig.parameters:
            valid_args[arg] = value
        else:
            raise ValueError(
                f"Invalid argument '{arg}' for function {func.__name__}."
            )
    return func(path, **valid_args)


def _get_filename_extension(path: str) -> tuple[str, str]:
    """Get the filename and extension from a path.

    Parameters
    ----------
    path : str
        Path to file.

    Returns
    -------
    filename : str
        Filename.
    file_extension : str
        File extension including the leading dot.

    """
    filename = os.path.basename(path)
    parts = filename.split(".", 1)
    if len(parts) > 1:
        file_extension = "." + parts[1]
    else:
        file_extension = ""
    return parts[0], file_extension.lower()


In [21]:
path

'D:\\Datasets\\FLIM\\new_acquisition\\hazelnut_3D_FLIM.sptw'

In [None]:
from tqdm import tqdm
harmonics = [1, 2]
reader_options = None
if Path(path).is_dir():
    print('is dir')
    folder_path = Path(path)
    # Get most common file extension inside folder (unless "folder" is a ".zarr" file)
    file_extension = get_most_frequent_file_extension(folder_path)
    if file_extension == '.zarr':
        show_error(f"File extension {file_extension} not supported.")
    else:
        # Get all file paths inside folder with the most common file extension
        file_paths = natsorted([file_path for file_path in folder_path.iterdir(
        ) if file_path.suffix == file_extension])
        # Get max time points and z slices based on file names convention ("_z" and "_t" patterns)
        list_of_time_point_paths = get_structured_list_of_paths(
                file_paths, file_extension)
        # Set up progress bar
        progress_bar = tqdm(total=len(list_of_time_point_paths) * len(list_of_time_point_paths[0]),
                        desc='Reading stack', unit='slices')

        z_list, t_list, z_list_labels = [], [], []
        max_label = 0
        built_labels = False
        for list_of_zslice_paths in list_of_time_point_paths:
            for zslice_path in list_of_zslice_paths:
                # Read file (z-slice) based on file extension
                if zslice_path.suffix in (tuple(extension_mapping["processed"].keys())):
                    layer_data_tuple = processed_file_reader(
                        path=zslice_path, reader_options=reader_options, harmonics=harmonics
                    )
                elif zslice_path.suffix in (tuple(extension_mapping["raw"].keys())):
                    layer_data_tuple = raw_file_reader(
                        path=zslice_path, reader_options=reader_options, harmonics=harmonics
                    )
                else:
                    show_error("File extension not supported.")

                z_slice_image = np.squeeze(layer_data_tuple[0][0])
                if built_labels is False:
                    # Store labels z-slice array
                    z_slice_labels = np.squeeze(layer_data_tuple[0][1]['metadata']['phasor_features_labels_layer'].data) + max_label
                    max_label = z_slice_labels.max()
                    z_list_labels.append(z_slice_labels)
                z_list.append(z_slice_image)
                progress_bar.update(1)
            # Build z-stack array
            z_stack = np.stack(z_list)
            z_stack_labels = np.stack(z_list_labels)
            built_labels = True # only needs to be built once
            t_list.append(z_stack)
            z_list = []
        # Build time-laspe array
        stack = np.stack(t_list)
        progress_bar.close()

# TODO: This needs to return a layer data tuple with the 3D stacks
    
    

is dir


Reading stack: 100%|██████████| 63/63 [00:58<00:00,  1.08slices/s]


In [55]:
layer_data_tuple[0][1]['metadata']

{'phasor_features_labels_layer': <Labels layer 'hazelnut_3D_FLIM_z63 Phasor Features Layer' at 0x2d6a9a23010>,
 'original_mean': array([[[0.        , 0.00757576, 0.00757576, ..., 0.        ,
          0.        , 0.        ],
         [0.00757576, 0.        , 0.00378788, ..., 0.        ,
          0.        , 0.        ],
         [0.01136364, 0.        , 0.00378788, ..., 0.00378788,
          0.        , 0.00378788],
         ...,
         [0.00378788, 0.        , 0.00378788, ..., 0.00378788,
          0.00757576, 0.00757576],
         [0.        , 0.00378788, 0.        , ..., 0.        ,
          0.        , 0.00378788],
         [0.00378788, 0.        , 0.        , ..., 0.00378788,
          0.00757576, 0.        ]]])}

In [49]:
stack.shape


(1, 63, 512, 512)