In [52]:
import os
import pandas as pd 
import polars as pl # like pandas, but much faster
import polars.selectors as cs
import numpy as np
import os, shutil, glob
from random import randint
import re, math
import datetime
import gc
from pathlib import Path

os.getcwd()


'/share/data/analyses/christa/colopaint3D_fork/spher_colo52_v1/1_Data'

### Read data

In [53]:

sourceDir = '/share/data/cellprofiler/automation/results'
rootDir = '/share/data/analyses/christa/colopaint3D_fork/spher_colo52_v1/1_Data/'


### Input directories

In [54]:
OutputDir = 'FeaturesImages_100125'
NameContains = ''
InputFolders = pl.read_csv(f'{rootDir}filemap.csv')
print(InputFolders)
            


shape: (3, 1)
┌───────────────────┐
│ feature_names     │
│ ---               │
│ str               │
╞═══════════════════╡
│ featICF_nuclei    │
│ featICF_cells     │
│ featICF_cytoplasm │
└───────────────────┘


In [55]:
now = datetime.datetime.now()
print ('Current date and time : ')
print (now.strftime('%Y-%m-%d %H:%M:%S'))

Current date and time : 
2025-01-10 14:46:00


#### Define Parameters


In [56]:
cols_to_drop = ['index','layout_id','cmpd_code', 'solvent','cmpd_conc_unit','stock_conc','stock_conc_unit','cmpd_vol', 'cmpd_vol_unit', 'well_vol', 'well_vol_unit', 'article_id','pubchemID', 'smiles', 'inkey', 'clinical_status']

use_clipping = False

std_mean = True

make_slices = True

### Define Functions


In [57]:
ObjectList = ['featICF_nuclei', 'featICF_cells', 'featICF_cytoplasm']

def print_time(msg=None):
    now = datetime.datetime.now()
    print(now.strftime('%Y-%m-%d %H:%M:%S'),msg or "")
float_columns=[pl.col(pl.Float32),pl.col(pl.Float64)]

def aggregate_mean(df_in):
    df_agg = df_in
    df_float_columns=set(list(df_agg.select(float_columns).columns))
    group_by_columns=['Metadata_PlateWell', 'Metadata_Barcode','Metadata_Well']
    other_columns=set(list(df_agg.columns))-df_float_columns-set(group_by_columns)
    # # group by mean for all float features, and group by first for all non-float columns (indices and string metadata)
    group_by_aggregates=[
        *[pl.mean(x) for x in list(df_float_columns)],
        *[pl.first(x) for x in list(other_columns)]
    ]
    df_agg_mean=df_agg.group_by(group_by_columns).agg(group_by_aggregates)
    return df_agg_mean

def aggregate_median(df_in):
    df_agg = df_in
    df_float_columns=set(list(df_agg.select(float_columns).columns))
    group_by_columns=['Metadata_Barcode','Metadata_Well']
    other_columns=set(list(df_agg.columns))-df_float_columns-set(group_by_columns)
    group_by_aggregates=[
        *[pl.median(x) for x in list(df_float_columns)],
        *[pl.first(x) for x in list(other_columns)]
    ]
    df_agg_median=df_agg.group_by(group_by_columns).agg(group_by_aggregates)
    return df_agg_median

def standardize_mean(df):
    # df = df.with_row_count('index')
    df_mean = pl.DataFrame()
    for i in range(df.select(pl.col('Metadata_Site')).max().item()):
        df_slice = df.filter(pl.col('Metadata_Site')==i)
        df_slice_DMSO=df_slice.filter(pl.col('Metadata_cmpdname')=='dmso')
        assert df_slice_DMSO.shape[0]>0, "did not find any wells 'treated' with DMSO"
        mu = df_slice_DMSO.select(float_columns).mean()
        std = df_slice_DMSO.select(float_columns).std()
        # replace 0 with 1 (specifically not clip) to avoid div by zero
        std = std.select([pl.col(c).replace({0: 1}, default=pl.col(c)) for c in std.columns])
        for i,col in enumerate(std.columns):
            if std[col].is_null().any():
                raise RuntimeError(f"some std value in column {col,i} is nan?!")
            if std[col].is_infinite().any():
                raise RuntimeError(f"some std value in column {col,i} is infinite?!")
            if (std[col]==0).any():
                raise RuntimeError(f"unexpected 0 in column {col}")
        print_time("calculated DMSO distribution for one slice")
        df_standardized_slice = df_slice.with_columns([(pl.col(c) - mu[c]) / (std[c]+0.01) for c in mu.columns])
        found_nan=False
        # checking nans:
        for i,col in enumerate(mu.columns):
            if df_standardized_slice[col].is_null().any():
                found_nan=True
                print(f"some value in column {col,i} is nan")
        if found_nan:
            raise RuntimeError("found nan")
        df_mean_slice=df_slice.with_columns([df_standardized_slice[c] for c in df_standardized_slice.columns])   
        df_mean = pl.concat([df_mean, df_mean_slice])
    # df_mean
    return df_mean

def standardize_mean_perplate(df):
    plate_list = df.select(pl.col('Metadata_Barcode')).to_series().unique().to_list()
    df_mean = pl.DataFrame()
    print(plate_list)
    for plate in plate_list:
        print(f'processing barcode {plate}')
        df_set = df.filter(pl.col('Metadata_Barcode')==plate)
        df_set = standardize_mean(df_set)
        df_mean = pl.concat([df_mean, df_set])
    return df


def normalize(df):
    df = df.with_row_count('index')
    df_norm = pl.DataFrame()
    for i in range(df.select(pl.col('Metadata_Site')).max().item()):
        df_slice = df.filter(pl.col('Metadata_Site')==i)
        df_slice_DMSO=df_slice.filter(pl.col('Metadata_cmpdname')=='dmso') # @DAVID TODO: sometimes dmso is capitalized
        assert df_slice_DMSO.shape[0]>0, "did not find any wells 'treated' with DMSO"
        maxi = df_slice_DMSO.select(float_columns).max()
        mini = df_slice_DMSO.select(float_columns).min()
        # replace 0 with 1 (specifically not clip) to avoid div by zero
        maxi = maxi.select([pl.col(c).replace({0: 1}, default=pl.col(c)) for c in maxi.columns])
        for i,col in enumerate(mini.columns):
            if maxi[col].is_null().any():
                raise RuntimeError(f"some std value in column {col,i} is nan?!")
            if maxi[col].is_infinite().any():
                raise RuntimeError(f"some std value in column {col,i} is infinite?!")
            if (maxi[col]==0).any():
                raise RuntimeError(f"unexpected 0 in column {col}")
        print_time("calculated DMSO distribution for one slice")
        df_normalized_slice = df_slice.with_columns([(pl.col(c) - mini[c]) / (maxi[c]) for c in maxi.columns])
        found_nan=False
        # checking nans:
        for i,col in enumerate(maxi.columns):
            if df_normalized_slice[col].is_null().any():
                found_nan=True
                print(f"some value in column {col,i} is nan")
        if found_nan:
            raise RuntimeError("found nan")
        df_norm_slice=df_slice.with_columns([df_normalized_slice[c] for c in df_normalized_slice.columns])   
        df_norm = pl.concat([df_norm, df_norm_slice])
    # df_mean
    return df_norm

def normalize_perplate(df):
    plate_list = df.select(pl.col('Metadata_Barcode')).to_series().unique().to_list()
    print(plate_list)
    df_mean = pl.DataFrame()
    for plate in plate_list:
        print(f'processing barcode {plate}')
        df_set = df.filter(pl.col('Metadata_Barcode')==plate)
        df_set = normalize(df_set)
        df_mean = pl.concat([df_mean, df_set])
    return df

def generate_slices(df, outdir, arg):
    df_sAgg = pl.DataFrame()
    for i in range(df.select(pl.col('Metadata_Site')).max().item()):
        df_slice = df.filter(pl.col('Metadata_Site')==i)
        if arg == 'mean':
            df_slice_agg = aggregate_mean(df_slice)
            df_slice_agg.write_parquet(f'{outdir}/{df.select(pl.col("Metadata_cell_line")).unique().item()}_Slice{i}MeanAgg.parquet')
        elif arg == 'median':
            df_slice_agg = aggregate_median(df_slice)
            df_slice_agg.write_parquet(f'{outdir}/{df.select(pl.col("Metadata_cell_line")).unique().item()}_Slice{i}MedianAgg.parquet')
        else:
            print('ERROR no valid metric')
        del df_slice
        df_sAgg = pl.concat([df_sAgg, df_slice_agg])
    return df_sAgg

def data_processing(metaEx, cl, ObjectList, OutputDir, sliceLim=13, statmet='minmax', per_plate=False):
    OutputDir = f'{OutputDir}_{statmet}'
    if per_plate:
        OutputDir = f'{OutputDir}_PerPlate'
    #Filtering Metadata and generating dirlist
    metaEx = metaEx.filter(pl.col('cell_line')==cl)
    barcodes = metaEx.select(pl.col('barcode')).unique()
    barcodes = barcodes['barcode']
    barcodes.to_list()
    dirlist = [f'{sourceDir}/{barcode}' for barcode in barcodes]

    print('Starting Processing')
    df = pl.DataFrame()
    for BaseDir in dirlist:
        mdf_op = metaEx.filter(pl.col('barcode') == BaseDir.split('/')[-1])
        image_id = mdf_op.select(pl.col('image_id')).unique().to_series().to_list()[-1]
        cp_id = mdf_op.select(pl.col('cp_id')).unique().to_series().to_list()[-1]
        print(f'{BaseDir}/{image_id}/{cp_id}')
        BaseDir = f'{BaseDir}/{image_id}/{cp_id}'      

        nuclei = pl.read_parquet(BaseDir+f'/{ObjectList[0]}.parquet')
        nuclei=nuclei.rename({x:f'{x}_nuclei' for x in nuclei.columns})
        cytoplasm = pl.read_parquet(BaseDir+f'/{ObjectList[1]}.parquet')
        cytoplasm=cytoplasm.rename({x:f'{x}_cytoplasm' for x in cytoplasm.columns}) 
        cells = pl.read_parquet(BaseDir+f'/{ObjectList[2]}.parquet')
        cells=cells.rename({x:f'{x}_cells' for x in cells.columns})
        # step 1: Take the mean values of 'multiple nuclei' belonging to one cell

        nuclei = nuclei.group_by([
            "Metadata_Barcode_nuclei","Metadata_Well_nuclei",
            "Parent_cells_nuclei", 'Metadata_Site_nuclei'
        ]).mean()

        df_one = cytoplasm.join(nuclei,
                    how='left', 
                    right_on=['Metadata_Well_nuclei', 'Metadata_Site_nuclei', 'Parent_cells_nuclei', 'Metadata_Barcode_nuclei'],
                    left_on = ['Metadata_Well_cytoplasm','Metadata_Site_cytoplasm', 'ObjectNumber_cytoplasm', 'Metadata_Barcode_cytoplasm'])
                    
        df_one = df_one.join(cells, how='left', 
                        left_on=['Metadata_Well_cytoplasm','Metadata_Site_cytoplasm','ObjectNumber_cytoplasm', 'Metadata_Barcode_cytoplasm'],
                        right_on = ['Metadata_Well_cells','Metadata_Site_cells',"ObjectNumber_cells", 'Metadata_Barcode_cells'])

        # print_time("initial merging")
        print('part1')



        # deduplicate barcode/well/site - renamed cytoplasm_Metadata* to Metadata* and removes nuclei_* etc
        unique_metadata_feature_names=['Metadata_Barcode','Metadata_Well','Metadata_Site']
        df_one=df_one.rename({f'{suffix}_cytoplasm':suffix for suffix in unique_metadata_feature_names})   ##TODO: Change this here. 
        # df = df.filter(pl.col(''))       
        # for some reason, the site is parsed as float, even though it really should be an int
        if df_one['Metadata_Site'].dtype in [np.dtype('float32'), np.dtype('float64')]:
            # sometimes, for some reason, site indices are inf/nan
            site_is_nan_mask=np.isnan(df_one['Metadata_Site'])
            site_is_inf_mask=np.isinf(df_one['Metadata_Site'])
            
            try:
                num_sites_nan=np.sum(site_is_nan_mask)
                num_sites_inf=np.sum(site_is_inf_mask)
                assert num_sites_nan==0, f"found nan site values (n = {num_sites_nan})"
                assert num_sites_inf==0, f"found inf site values (n = {num_sites_inf})"
            except AssertionError as e:
                print(f"info - this issue was automatically circumvented in the code : {e}")
                df_one=df_one[~(site_is_inf_mask|site_is_nan_mask)]
                
            num_metadata_site_entries_nonint=np.sum(np.abs(df_one['Metadata_Site']%1.0)>1e-6)
            assert num_metadata_site_entries_nonint==0, f"ERROR : {num_metadata_site_entries_nonint} imaging sites don't have integer indices. that should not be the case, and likely indicates a bug."
            
            #Should use np.round, no? TODO ask patrick. Truncation Errors are annoying.
            df_one['Metadata_Site'] = df_one['Metadata_Site'].astype(np.dtype('int32'))
        
        #Adding Compound Metadata to each row
        df_one = df_one.join(mdf_op.rename({x:f"Metadata_{x}" for x in mdf_op.columns}),left_on='Metadata_Well',right_on='Metadata_well_id')
        df_one = df_one.filter(pl.col('Metadata_Site')<sliceLim)
        df = pl.concat([df, df_one])
        plate_name = f'processed metadata for {BaseDir.split("/")[-1]}'
        print(plate_name)

    df = df.sort(pl.col('Metadata_Site'))
    ###Here should be workable to unify by cell line.
    df = df.with_columns((pl.col("Metadata_Barcode") + "_" + pl.col("Metadata_Well")).alias("Metadata_PlateWell"))
    print(df.select(pl.col('Metadata_cell_line')).to_series().unique().to_list())
    df = df.filter(pl.col('Metadata_cell_line')==cl)
    print(df.select(pl.col('Metadata_cell_line')).to_series().unique().to_list())
    ###End
    # drop all rows that contain nan
    num_rows_before_nan_trim = df.shape[0]
    for col in df.select([pl.col(pl.Float32),pl.col(pl.Float64)]).columns:
        before_drop=df.shape[0]
        df=df.filter(pl.col(col).is_not_null())
        after_drop=df.shape[0]

        num_values_dropped=before_drop-after_drop
        if num_values_dropped>0:
           print(f"dropped {num_values_dropped} rows due to NaNs in column {col}")

    num_rows_after_nan_trim = df.shape[0]
    print_time("dropped NaNs")
    print(num_rows_after_nan_trim)
    # Clip outliers
    use_clipping = False
    if use_clipping:
        print('clipping values....')
        float_cols = [c for c_name,c_dtype in zip(df.columns,df.dtypes) if "float" in str(c_dtype)]
        lower_quantile = df.select(float_cols).quantile(0.01)
        upper_quantile = df.select(float_cols).quantile(0.99)
        print("calced quantiles")
        for col in float_cols:
            df = df.with_column(
                pl.col(col).clip(lower=lower_quantile[col],upper=upper_quantile[col])
            )
        print("clipped")


    # # # df_mean=df.with_columns([df_standardized[c] for c in df_standardized.columns])
    print(per_plate)
    if statmet == 'meanstd':
        print('Standardizing by MeanSTD')
        if per_plate:
            print('standardizing per plate')
            df = standardize_mean_perplate(df)
        else:
            df = standardize_mean(df)
    if statmet == 'minmax':
        print('Normalizing')
        if per_plate:
            print('normalizing per plate')
            df = normalize_perplate(df)
        else:
            df = normalize(df)
    # df_mean = df

    # Remove unnecessary columns
    pattern = re.compile(r'FileName|PathName|ObjectNumber|ImageNumber|AcqID')
    # pattern = re.compile(r'FileName|PathName')
    columns_to_keep = [col for col in df.columns if not pattern.search(col)]
    dfOut = df.select(columns_to_keep)
    
    ScOut = f'{OutputDir}/SingleCell/'
    if not os.path.exists(ScOut): 
        os.makedirs(ScOut)
    df.write_parquet(f'{ScOut}/{dfOut.select(pl.col("Metadata_cell_line")).unique().item()}.parquet')

    # if make_slices:
    slicesOut = f'{OutputDir}/SingleSlice/'
    if not os.path.exists(slicesOut): 
        os.makedirs(slicesOut)
    df = generate_slices(dfOut, slicesOut, 'median')
    

    #Generating the output directories
    aggOut = f'{OutputDir}/WellAggregates/'
    if not os.path.exists(aggOut): 
        os.makedirs(aggOut)
    

    # df_agg_mean = aggregate_mean(df)
    # df_agg_mean.write_parquet(f'{aggOut}/{dfOut.select(pl.col("Metadata_cell_line")).unique().item()}_MeanAgg_meanstd.parquet')
    df_agg_median = aggregate_median(dfOut)
    df_agg_median.write_parquet(f'{aggOut}/{dfOut.select(pl.col("Metadata_cell_line")).unique().item()}_MedianAgg_meanstd.parquet')
    del df_agg_median
    # del df_agg_mean
    # del df_mean
    print_time("binned mean data per well")




In [58]:
metaEx = pl.read_csv(f'{rootDir}spher_colo52-metadata.csv')
metaEx = metaEx.drop(cols_to_drop)


In [59]:
metaEx.head()


well_id,image_id,cp_id,barcode,plate_well,cmpdname,cmpd_conc,pert_type,target,pathway,target_type,cell_line
str,i64,i64,str,str,str,f64,str,str,str,str,str
"""B02""",4185,5532,"""PB000137""","""PB000137_B02""","""PD0325901""",3.0,"""trt""","""MEK""","""MAPK""","""Targeted""","""HCT116"""
"""B03""",4185,5532,"""PB000137""","""PB000137_B03""","""Paclitaxel""",0.1,"""trt""","""Autophagy,Micr…","""Cytoskeletal S…","""Cytotoxic""","""HCT116"""
"""B04""",4185,5532,"""PB000137""","""PB000137_B04""","""Olaparib (AZD2…",3.0,"""trt""","""PARP""","""DNA Damage""","""Targeted""","""HCT116"""
"""B05""",4185,5532,"""PB000137""","""PB000137_B05""","""SB216763""",10.0,"""trt""","""GSK-3""","""PI3K/Akt/mTOR""","""Targeted""","""HCT116"""
"""B06""",4185,5532,"""PB000137""","""PB000137_B06""","""Vorinostat (SA…",3.0,"""trt""","""Autophagy,HDAC…","""Epigenetics""","""Targeted""","""HCT116"""


In [60]:
# metaEx.select(pl.col(['cell_line'])).unique().to_series().to_list()
# metaEx.select(pl.col(['cell_line'])).unique().to_series().to_list()
# metaEx.select(pl.col(['barcode'])).unique().to_series().to_list()

In [61]:
metaEx.select(pl.col(['cell_line'])).unique().to_series().to_list()

['HCT116', 'HT29']

In [62]:
# data_processing(metaEx, 'HCT116', ObjectList, OutputDir, statmet='meanstd', per_plate=True)

In [63]:
# data_processing(metaEx, 'HCT116', ObjectList, OutputDir, statmet='meanstd', per_plate=False)

In [64]:
# data_processing(metaEx, 'HT29', ObjectList, OutputDir, statmet='meanstd', per_plate=True)

In [65]:
metaEx.select(pl.col(['barcode'])).unique().to_series().to_list()

['PB000138', 'PB000137', 'PB000139', 'PB000140', 'PB000141', 'PB000142']

In [66]:
# data_processing(metaEx, 'HCT116', ObjectList, OutputDir)

In [67]:
# data_processing(metaEx, 'HCT116', ObjectList, OutputDir, statmet='minmax', per_plate=True)

In [68]:
# data_processing(metaEx, 'HCT116', ObjectList, OutputDir, statmet='meanstd', per_plate=False)

In [69]:
# data_processing(metaEx, 'HCT116', ObjectList, OutputDir, statmet='meanstd', per_plate=True)

In [70]:
data_processing(metaEx, 'HCT116', ObjectList, OutputDir, statmet='none', per_plate=False)

Starting Processing
/share/data/cellprofiler/automation/results/PB000139/4186/5533


part1
processed metadata for 5533
/share/data/cellprofiler/automation/results/PB000138/4188/5547
part1
processed metadata for 5547
/share/data/cellprofiler/automation/results/PB000140/4189/5573
part1
processed metadata for 5573
/share/data/cellprofiler/automation/results/PB000137/4185/5532
part1
processed metadata for 5532
['HCT116']
['HCT116']
dropped 1 rows due to NaNs in column AreaShape_FormFactor_cytoplasm
dropped 3 rows due to NaNs in column Correlation_Costes_CONC_HOECHST_cytoplasm
dropped 1 rows due to NaNs in column Correlation_Costes_MITO_HOECHST_cytoplasm
dropped 25 rows due to NaNs in column Neighbors_AngleBetweenNeighbors_Adjacent_cytoplasm
dropped 4 rows due to NaNs in column Neighbors_FirstClosestDistance_Adjacent_cytoplasm
dropped 36 rows due to NaNs in column Neighbors_AngleBetweenNeighbors_Adjacent_nuclei
dropped 4 rows due to NaNs in column Neighbors_FirstClosestDistance_Adjacent_nuclei
dropped 30 rows due to NaNs in column AreaShape_Area_cells
dropped 11 rows due to

In [71]:
# data_processing(metaEx, 'HT29', ObjectList, OutputDir, statmet='minmax', per_plate=False)

In [72]:
# data_processing(metaEx, 'HT29', ObjectList, OutputDir, statmet='minmax', per_plate=True)

In [73]:
# data_processing(metaEx, 'HT29', ObjectList, OutputDir, statmet='meanstd', per_plate=False)

In [74]:
# data_processing(metaEx, 'HT29', ObjectList, OutputDir, statmet='meanstd', per_plate=True)

In [75]:
# data_processing(metaEx, 'HT29', ObjectList, OutputDir, statmet='none', per_plate=False)

In [76]:
now = datetime.datetime.now()
print ('Current date and time : ')
print (now.strftime('%Y-%m-%d %H:%M:%S'))

Current date and time : 
2025-01-10 14:48:30


In [77]:
metaEx.select(pl.col('cp_id')).max().item()

5574