In [1]:
import pandas as pd
from tqdm.notebook import tqdm

from pycytominer import normalize
from pycytominer.operations.transform import Spherize
from pycytominer.operations.transform import RobustMAD

In [2]:
# Getting cp raw embeddings
def get_cp_embeddings(selected_metadata):
    """Get CP raw embeddings
     
    :param selected_metadata: pd.DataFrame
        DataFrame that contains metadata [Metadata_Source, Metadata_Batch, Metadata_Plate]
    
    :return
        Dataframe with raw CP embeddings
    """
    
    profile_formatter = (
        "s3://cellpainting-gallery/cpg0016-jump/"
        "{Metadata_Source}/workspace/profiles/"
        "{Metadata_Batch}/{Metadata_Plate}/{Metadata_Plate}.parquet"
    )
    _dframes = pd.DataFrame()
    for _, row in tqdm(selected_metadata.iterrows(), total=len(selected_metadata)):
        s3_path = profile_formatter.format(**row.to_dict())
        df = pd.read_parquet(s3_path, storage_options={"anon": True})
        plate_name = row["Metadata_Plate"]
        df.to_parquet(f"CP_features/{plate_name}.parquet")
        _dframes = pd.concat([_dframes, df], axis=0, ignore_index=True)
        
    return _dframes
    
# For dropping some bad columns with bad std from cell-painting

def drop_bad_columns(df, feature_cols=None):
    """Drop bad CP embeddings dimensions
    
    :param df: pd.DataFrame
        DataFrame that contains CP embeddings
    :param cols: List[str]
        List of column names that indicate CP features
    
    :return
        Dataframe with clean raw CP embeddings
    """
    
    if not feature_cols:
        feature_cols = [c for c in df.columns if "Metadata_" not in c]
    stdev = [df[c].std() for c in feature_cols]

    cols_to_drop = []
    cols_to_drop.extend([feature_cols[i] for i, s in enumerate(stdev) if s < 0.1 or s > 10])
    # cols_to_drop.extend([c for c in feature_cols if "Nuclei_Correlation_RWC" in c])
    # cols_to_drop.extend([c for c in feature_cols if "Nuclei_Correlation_Manders" in c])
    # cols_to_drop.extend([c for c in feature_cols if "Nuclei_Granularity_14" in c])
    # cols_to_drop.extend([c for c in feature_cols if "Nuclei_Granularity_15" in c])
    # cols_to_drop.extend([c for c in feature_cols if "Nuclei_Granularity_16" in c])

    df = df[[c for c in df.columns if c not in cols_to_drop]]
    return df

In [3]:

# Postprocessing

def robustMAD(df, feature_cols, control_only=False):
    """MAD robust
    
    :param df: pd.DataFrame
        DataFrame that contains CP embeddings
    :param feature_cols: List[str]
        List of column names that indicate features
    :param control_only: bool
        processing agains control images only / full dataset 
    
    :return
        Dataframe with processed raw CP embeddings
    """
    
    if not control_only:
        feature_df = df.loc[:, feature_cols]
    else:
        feature_df = df.query('`Metadata_SMILES` == "CS(C)=O"').loc[:, feature_cols]
        if len(feature_df) == 0:
            print("No control samples found. Fall-back to full normailization")
            feature_df = df.loc[:, feature_cols]         
    rmad = RobustMAD()
    rmad.fit(feature_df)
    return rmad.transform(df.loc[:, feature_cols])
    
def spherize(df, feature_cols, control_only=False):
    """Spherizing
    
    :param df: pd.DataFrame
        DataFrame that contains CP embeddings
    :param feature_cols: List[str]
        List of column names that indicate features
    :param control_only: bool
        processing agains control images only / full dataset 
    
    :return
        Dataframe with processed raw CP embeddings
    """
    
    if not control_only:
        feature_df = df.loc[:, feature_cols]
    else:
        feature_df = df.query('`Metadata_SMILES` == "CS(C)=O"').loc[:, feature_cols]
        if len(feature_df) == 0:
            print("No control samples found. Fall back to full normailization")
            feature_df = df.loc[:, feature_cols]         
    spherize = Spherize()
    spherize.fit(feature_df)
    return spherize.transform(df.loc[:, feature_cols])

# Batch-wise process

def batch_wise_spherize_and_normailize(df, batch_col="Metadata_Source", feature_cols=None, is_spherize=True, is_normalize=True, control_only=False):
    """Mad robust + spherizing
    
    :param df: pd.DataFrame
        DataFrame that contains CP embeddings
    :param batch_col: str
        Hue indicating the batch columns
    :param feature_cols: List[str]
        List of column names that indicate features
    :param is_spherize: bool
        Whether to spherize
    :param is_normalize: bool
        Wheter to MAD robust  
    :param control_only: bool
        processing agains control images only / full dataset 
    
    :return
        Dataframe with processed raw CP embeddings
    """ 
    
    _df = df.copy(deep=True)
    if not feature_cols:
        feature_cols = [c for c in df.columns if not c.startswith('Metadata_')]
    for batch in tqdm(_df[batch_col].unique()):
        batch_df = _df.loc[_df[batch_col] == batch].copy(deep=True)
        if is_normalize:
            batch_df.loc[:, feature_cols] = robustMAD(batch_df, feature_cols=feature_cols, control_only=control_only)
        _df.loc[_df[batch_col] == batch] = batch_df.copy(deep=True)
    if is_spherize:
        _df.loc[:, feature_cols] = spherize(_df, feature_cols=feature_cols, control_only=control_only)   
    return _df

In [4]:
# Anndata evaluation

# import scib
# import anndata as ad

def transform_pd_to_ad(df, feature_col):
    meta_cols = [c for c in df.columns if c.startswith("Metadata_")]
    features = df[feature_col].to_numpy()
    adata = ad.AnnData(features)
    adata.obs_names = [f"Cell_{i:d}" for i in range(adata.n_obs)]
    adata.var_names = feature_col
    for col in meta_cols:
        adata.obs[col] = pd.Categorical(df[col].to_numpy())
    return adata

def calculate_metric(annData_orig, annData_untreated, annData_untreated_2, exp_key, batch_key='Metadata_Source'):
    annData_scanorama = scib.ig.scanorama(annData_untreated, batch=batch_key)
    annData_harmony = scib.ig.harmony(annData_untreated_2, batch=batch_key)
    scib.preprocessing.reduce_data(
    annData_orig, n_top_genes=min(2000, len(annData_orig.var)), batch_key=batch_key, pca=True, neighbors=False
    )

    scanorama_res = scib.metrics.metrics(annData_scanorama, annData_scanorama, embed='X_scanorama', batch_key=batch_key, label_key="Metadata_SMILES",
                    ari_=True, nmi_=True, pcr_=False, silhouette_=True, isolated_labels_=True, graph_conn_=True,
                    kBET_=False, lisi_graph_=True).dropna().rename(columns={0:f'{exp_key}_{batch_key.split("_")[1]}_scanorama'})
    
    
    harmony_res = scib.metrics.metrics(annData_harmony, annData_harmony, embed='X_emb', batch_key=batch_key, label_key="Metadata_SMILES",
                    ari_=True, nmi_=True, pcr_=False, silhouette_=True, isolated_labels_=True, graph_conn_=True,
                    kBET_=False, lisi_graph_=True).dropna().rename(columns={0:f'{exp_key}_{batch_key.split("_")[1]}_harmony'})
    
    orig_res = scib.metrics.metrics(annData_orig, annData_orig, embed='X_pca', batch_key=batch_key, label_key="Metadata_SMILES",
                    ari_=True, nmi_=True, pcr_=False, silhouette_=True, isolated_labels_=True, graph_conn_=True,
                    kBET_=False, lisi_graph_=True).dropna().rename(columns={0:f'{exp_key}_{batch_key.split("_")[1]}_baseline'})
    
    res = pd.concat([orig_res, scanorama_res, harmony_res], axis=1)
    # res = orig_res
    print("##################  Finished one metric calculation complete ##################\n")
    return res