# 1b Nucleoid Mapping to Cells

In [1]:
import trenchripper.trenchripper as tr

import warnings
import dask

import numpy as np
import skimage as sk
import pandas as pd
import dask.dataframe as dd

from dask.distributed import wait

warnings.filterwarnings(action="once")

# addition of active memory manager
dask.config.set({'distributed.scheduler.active-memory-manager.start': True});
dask.config.set({'distributed.scheduler.worker-ttl': "5m"});
dask.config.set({'distributed.scheduler.allowed-failures': 100});

dask_wd = "/home/de64/scratch/de64/dask"

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


In [3]:
dask_controller = tr.trcluster.dask_controller(
    walltime="1:00:00",
    local=False,
    n_workers=20,
    n_workers_min=20,
    memory="10GB",
    working_directory=dask_wd,
)
dask_controller.startdask()

50m
1:00:00


In [4]:
dask_controller.displaydashboard()

# Function definitions

In [6]:
def assign_nucleoids_to_cells_df(nuc_df, cell_df):
    
    nuc_df_output = nuc_df
    nuc_df = nuc_df.reset_index().loc[:, ('File Trench Index', 'timepoints','Objectid', 'centroid_x', 'centroid_y', 'File Parquet Index')].set_index(['File Trench Index','timepoints', 'Objectid'])
    cell_df = cell_df.reset_index().loc[:,('File Trench Index', 'timepoints', 'Segment Index' , 'bbox_min_row', 'bbox_min_col', 'bbox_max_row', 'bbox_max_col', 'File Parquet Index')].set_index(['File Trench Index', "timepoints", "Segment Index"])

    nuc_array = nuc_df.to_xarray().to_array().to_numpy()
    nuc_array = np.moveaxis(nuc_array, 0, -1)
    
    cell_array = cell_df.to_xarray().to_array().to_numpy()
    cell_array = np.moveaxis(cell_array, 0, -1)

    del nuc_df, cell_df
    
    nuc_df_output["Segment Index"], nuc_df_output["Cells Per Nucleoid"], nuc_df_output["Nuc Parquet Index"], nuc_df_output["Cell Parquet Index"] = assign_nucleoids_to_cells(nuc_array = nuc_array, cell_array = cell_array)
    
    return nuc_df_output
    
#     mapping, cells_per_nuc, nuc_parquet, cells_parquet = assign_nucleoids_to_cells(nuc_array = nuc_array, cell_array = cell_array)
    
#     del nuc_array, cell_array
#     nuc_df_output["Segment Index"] = mapping
#     nuc_df_output["Cells Per Nucleoid"] = cells_per_nuc
#     nuc_df_output["Nuc Parquet Index"] = nuc_parquet
#     nuc_df_output["Cell Parquet Index"] = cells_parquet
#     return nuc_df_output

def assign_nucleoids_to_cells(nuc_array, cell_array):
    """
    nuc_array and cell_array are numpy arrays of dimensions:
    (# Trench ID indices x # Timepoints x Max number of cells per timepoint x Max number of nucleoids per timepoint)
    """
    
    # These ones have dimensions trench_idx X timepoints X  Max # nucleoids per image X 1
    x = nuc_array[:,:,:,0][:,:,:,None] # x coordinate of nucleoid centroid
    y = nuc_array[:,:,:,1][:,:,:,None] # y coordiate of nucleoid centroid

    # These ones have dimensions trench_idx X timepoints X  1 X Max # of cells per image
    min_row = cell_array[:,:,:,0][:,:,None,:]
    min_col = cell_array[:,:,:,1][:,:,None,:]
    max_row = cell_array[:,:,:,2][:,:,None,:]
    max_col = cell_array[:,:,:,3][:,:,None,:]

    # It compares all nucleoids with all cells
    # Is x greater than min_column?
    x_g_min_col = x > min_col
    # Is x less than max_colum?n
    x_l_max_col = x < max_col
    # Is y greater than min_row?
    y_g_min_row = y > min_row
    y_l_max_row = y < max_row

    
    '''
    Each element is Trench id, timepoint, nucleoid id, cell id of nucleoid - cell pair that was true on the previous array
    It will match the length of nuc_df if there are no multiple dfs.

    - This will have trouble if a nucleoid is mapped 0 or >1 times.
    '''
    inside = x_g_min_col & x_l_max_col & y_g_min_row & y_l_max_row
    
    '''
    2022-10-26
    '''
    # Finds the number of cells per nucleoid
    n_cells_per_nuc_collapsed = np.sum(inside, axis=3)

    # Get nans
    not_nan = ~(np.isnan(x) | np.isnan(min_row))
    not_nan_collapsed = np.any(not_nan,axis=3)

    # Add a column of true to the end of the "inside" array
    inside2 = np.append(inside, 
                        np.ones((inside.shape[0], inside.shape[1], inside.shape[2], 1), dtype=bool), axis=3)
    first_cell = np.argmax(inside2, axis=3)

    # Selects nucleoids that have no cells but are not nan, or have one or more cells
    nuc_has_zero_cells = first_cell == inside.shape[3]
    nuc_has_zero_cells_but_not_nan = (nuc_has_zero_cells) & (not_nan_collapsed)
    nuc_has_one_or_more_cells = n_cells_per_nuc_collapsed > 0

    # ONLY NO MAPPED NUCLEOIDS: Modify first cell to include -1 if the nucleoid was not mapped to any real cell
    # This will happen if the nucleoid is mapped to cell inside.shape[3]
    first_cell[nuc_has_zero_cells] = -1

    # Boolean array (trench id X timepoint X nucleoid ): True if nucleoid is selected as valid
    nuc_selected = nuc_has_zero_cells_but_not_nan | nuc_has_one_or_more_cells

    # Recover parquet index of selected nucleoids and cells
    nuc_selected_idx = np.nonzero(nuc_selected)

    '''
    GENERATE THE LISTS
    '''
    # Nucleoid parquet indices
    nuc_parquet_ids = nuc_array[nuc_selected_idx[0:3]][:,2].astype('int64')

    # Cell parquet indices
    # It will be nan if nucleoid is not mapped to cell
    cell_parquet_ids = cell_array[nuc_selected_idx[0],
                                  nuc_selected_idx[1],
                                  first_cell[nuc_selected_idx]][:,4].astype('int64')

    # Number of cells per nucleoid
    # It will be zero if nucleoid is not mapped to cell
    cells_per_nuc = n_cells_per_nuc_collapsed[nuc_selected_idx]

    # Indices of cells (in the cell array) mapped to each nucleoid. These are not the Segment IDs
    cell_index_nuc = first_cell[nuc_selected_idx]

    # ONLY NO MAPPED NUCLEOIDS: Make repeated ones -1
    # Make cell parquet index "-1"
    cell_parquet_ids[cell_index_nuc == -1] = -1
    
    return cell_index_nuc, cells_per_nuc, nuc_parquet_ids, cell_parquet_ids
    
def reconcile_trenches(ref_df,masked_df,key="Trenchid Timepoint Index"):
    trench_id_tpt_indices_1 = set(ref_df[key].unique().tolist())
    trench_id_tpt_indices_2 = set(masked_df[key].unique().tolist())
    overlap = sorted(list(trench_id_tpt_indices_1&trench_id_tpt_indices_2))
    masked_df_mask = masked_df[key].isin(overlap)
    masked_df = masked_df[masked_df_mask]
    return masked_df

def assign_nuc_stats_to_cells(nuc_df):
    
    grouped_nuc_df = nuc_df.groupby('Cell Parquet Index', sort = False)
    
    # Number of nucleoids
    nuc_number = grouped_nuc_df.apply(len)
    nuc_number.name = 'Number of Nucleoids'
    
    # Create summary DataFrame
    nuc_data_per_cell = pd.DataFrame(nuc_number)

    # Compute Total Nuc area, Maj Axis length and Nucleoid separations Per Cell Segment
    nuc_data_per_cell = (nuc_data_per_cell
                         .join(grouped_nuc_df['area'].sum()) # Total Nucleoid Area
                         .rename(columns={'area': 'Total Nucleoid Area'})
                         .join(grouped_nuc_df['axis_major_length'].sum()) # Major axis length
                         .rename(columns={'axis_major_length': 'Total Nucleoid Length'})
                         .join(grouped_nuc_df['centroid_y'].apply(np.diff)) # Nucleoid separations
                         .rename(columns={'centroid_y': 'Nucleoid Separations'})
                        )
    
    # Get separation stats
    nuc_data_per_cell['Nucleoid Separation Mean'] = nuc_data_per_cell['Nucleoid Separations'].apply(np.mean)
    nuc_data_per_cell['Nucleoid Separation CV'] = nuc_data_per_cell['Nucleoid Separations'].apply(np.std)/nuc_data_per_cell['Nucleoid Separation Mean']
    nuc_data_per_cell['Nucleoid Separation CV'] = nuc_data_per_cell['Nucleoid Separation CV'].replace({0:np.nan})
    nuc_data_per_cell['Nucleoid Separation Min'] = nuc_data_per_cell['Nucleoid Separations'].apply(np.min, args=(None, None, False, np.inf))
    nuc_data_per_cell['Nucleoid Separation Min'] = nuc_data_per_cell['Nucleoid Separation Min'].replace({np.inf:np.nan})
    nuc_data_per_cell['Nucleoid Separation Max'] = nuc_data_per_cell['Nucleoid Separations'].apply(np.max, args=(None, None, False, -np.inf))
    nuc_data_per_cell['Nucleoid Separation Max'] = nuc_data_per_cell['Nucleoid Separation Max'].replace({-np.inf:np.nan})
    nuc_data_per_cell = nuc_data_per_cell.drop('Nucleoid Separations', axis=1) # Drop nucleoid separations

    # Get rest of stats
    mean_and_cv_properties = ['area', 'solidity', 'mCherry mean_intensity']
    mean_and_cv_properties_names = ['Nucleoid Area', 'Nucleoid Solidity', 'Nucleoid Mean Intensity']

    mean_only_properties = ['axis_major_length', 'axis_minor_length', 'eccentricity', 'orientation'] # 2023-04-04: Added Eccentricity and orientation
    mean_only_properties_names = ['Nucleoid Major Axis Length', 'Nucleoid Minor Axis Length', 'Eccentricity', 'Orientation']

    for i, metric in enumerate(mean_and_cv_properties):
        # Get means
        nuc_data_per_cell = (nuc_data_per_cell
                             .join(grouped_nuc_df[metric].mean())
                             .rename(columns={metric: mean_and_cv_properties_names[i] + ' Mean'})
                            )

        # Get CVs
        cv_df = grouped_nuc_df[metric].std()/nuc_data_per_cell[mean_and_cv_properties_names[i] + ' Mean']
        cv_df.name = mean_and_cv_properties_names[i] + ' CV'
        nuc_data_per_cell =  nuc_data_per_cell.join(cv_df)
        # nuc_data_per_cell.rename(columns={metric: mean_and_cv_properties_names[i] + ' CV'}, inplace=True)

    for i, metric in enumerate(mean_only_properties):
        nuc_data_per_cell = (nuc_data_per_cell
                             .join(grouped_nuc_df[metric].mean())
                             .rename(columns={metric: mean_only_properties_names[i] + ' Mean'})
                            )
    # nuc_data_per_cell.drop(-1, inplace=True) # Drop unassigned nucleoids
    
    # Merge with cell df
    return nuc_data_per_cell

## Merge Nucleoid Data to Lineage

In [7]:
headpath_list = ["/home/de64/scratch/de64/sync_folder/2023-03-25_lDE28_Run_1/Growth_Division"]

In [8]:
# Load Dask dataframes

for experiment_i, headpath in enumerate(headpath_list):
    cell_lineage_path = headpath + "/lineage"
    nucleoid_path = headpath + "/analysis"
    lineage_output_path = headpath + "/lineage_wNucleoid"
    nucleoid_output_path = headpath + "/analysisNucsMappedToCells"    
    
    yfp_lineages = dd.read_parquet(cell_lineage_path,engine='pyarrow',calculate_divisions=True)
    mch_regprops = dd.read_parquet(nucleoid_path,engine='pyarrow',calculate_divisions=True)
    
    # Repartition to match File indices and reconcile trenches
    file_indices_list = sorted(yfp_lineages["File Index"].unique().compute().tolist())
    file_indices_list = file_indices_list + [file_indices_list[-1] + 1]
    
    yfp_lineages = yfp_lineages.reset_index().set_index("File Index",sorted=True).repartition(divisions=file_indices_list,force=True)\
                    .reset_index().set_index("File Parquet Index",sorted=True).persist()
    mch_regprops = mch_regprops.reset_index().set_index("File Index",sorted=True).repartition(divisions=file_indices_list,force=True)\
                    .reset_index().set_index("File Parquet Index",sorted=True).persist()
    
    wait(yfp_lineages);
    wait(mch_regprops);
    
    mch_regprops= dd.map_partitions(reconcile_trenches,yfp_lineages,mch_regprops,transform_divisions=False,align_dataframes=False).persist()
    yfp_lineages= dd.map_partitions(reconcile_trenches,mch_regprops,yfp_lineages,transform_divisions=False,align_dataframes=False).persist()
    wait(mch_regprops);
    wait(yfp_lineages);
    
    # Get meta df
    empty_output = mch_regprops.get_partition(0).compute().loc[-1:-1]
    empty_output["Segment Index"] = []
    empty_output["Segment Index"] = empty_output["Segment Index"].astype(int)
    
    empty_output["Cells Per Nucleoid"] = []
    empty_output["Cells Per Nucleoid"] = empty_output["Cells Per Nucleoid"].astype(int)
    
    empty_output["Nuc Parquet Index"] = []
    empty_output["Nuc Parquet Index"] = empty_output["Nuc Parquet Index"].astype('int64')
    
    empty_output["Cell Parquet Index"] = []
    empty_output["Cell Parquet Index"] = empty_output["Cell Parquet Index"].astype('int64')
    
    # Assign nucleoids to cells
    mch_regprops = dd.map_partitions(assign_nucleoids_to_cells_df,mch_regprops,yfp_lineages,transform_divisions=False,align_dataframes=False,meta=empty_output).persist()
    wait(mch_regprops);
    
    # Get Area, number of nucleoids per cell, and merge with cell df
    # Generate meta df
    empty_df = yfp_lineages.get_partition(0).compute().iloc[-1:-1]
    cols_to_remove = empty_df.columns
    empty_df = empty_df.reindex(columns=list(empty_df.columns)+
           ['Number of Nucleoids', 'Total Nucleoid Area', 'Total Nucleoid Length',
            'Nucleoid Separation Mean', 'Nucleoid Separation CV',
            'Nucleoid Separation Min', 'Nucleoid Separation Max',
            'Nucleoid Area Mean', 'Nucleoid Area CV', 'Nucleoid Solidity Mean',
            'Nucleoid Solidity CV', 'Nucleoid Mean Intensity Mean',
            'Nucleoid Mean Intensity CV', 'Nucleoid Major Axis Length Mean',
            'Nucleoid Minor Axis Length Mean', 'Eccentricity Mean', 'Orientation Mean'])
    empty_df=empty_df.drop(columns=cols_to_remove)
    empty_df['Number of Nucleoids'] = empty_df['Number of Nucleoids'].astype('int64')   
    
    # Generate stats df
    nuc_data_per_cell_dd = dd.map_partitions(assign_nuc_stats_to_cells,mch_regprops,transform_divisions=False,align_dataframes=False,meta=empty_df).persist()
    wait(nuc_data_per_cell_dd);
    
    # Save mcherry df
    dd.to_parquet(mch_regprops, nucleoid_output_path, engine="pyarrow", overwrite=True)
    del mch_regprops
    
    # Load again
    yfp_lineages = dd.read_parquet(cell_lineage_path,engine='pyarrow',calculate_divisions=True)
    
    # Join to stats df
    yfp_lineages = yfp_lineages.join(nuc_data_per_cell_dd, how = 'left')
    
    # wait(yfp_lineages);
    yfp_lineages['Total Nucleoid Area'] = yfp_lineages['Total Nucleoid Area'].fillna(0)
    yfp_lineages['Total Nucleoid Length'] = yfp_lineages['Total Nucleoid Length'].fillna(0)
    yfp_lineages['Number of Nucleoids'] = yfp_lineages['Number of Nucleoids'].fillna(0)
    yfp_lineages['Nucleoid Area Mean'] = yfp_lineages['Nucleoid Area Mean'].fillna(0)
    yfp_lineages['Nucleoid Mean Intensity Mean'] = yfp_lineages['Nucleoid Mean Intensity Mean'].fillna(0)
    yfp_lineages['Nucleoid Major Axis Length Mean'] = yfp_lineages['Nucleoid Major Axis Length Mean'].fillna(0)
    yfp_lineages['Nucleoid Minor Axis Length Mean'] = yfp_lineages['Nucleoid Minor Axis Length Mean'].fillna(0)
    # wait(yfp_lineages);
    
    # Compute variables involving both nuc and cell variables
    yfp_lineages['Total Nucleoid Length per Cell Length'] = yfp_lineages['Total Nucleoid Length']/yfp_lineages['Length']
    yfp_lineages['Total Nucleoid Area per Cell Area'] = yfp_lineages['Total Nucleoid Area']/yfp_lineages['area']
    yfp_lineages['Total Number of Nucleoids per Cell Length'] = yfp_lineages['Number of Nucleoids']/yfp_lineages['Length']
    
    # Save
    dd.to_parquet(yfp_lineages, lineage_output_path, engine="pyarrow", overwrite=True)

In [9]:
dask_controller.shutdown()

Done.


2024-02-26 20:40:00,390 - distributed.deploy.adaptive_core - INFO - Adaptive stop
