In [2]:
import anndata
import numpy as np
import os
import pandas as pd
from matplotlib import pyplot as plt
from scipy import stats
from sklearn import mixture
from sklearn.neighbors import NearestNeighbors
from typing import Any

In [12]:
PATH_TO_FOLDER = os.path.join('..', 'd4l')
TRAIN_DATA_PATH = os.path.join(PATH_TO_FOLDER, 'train')
ORIGINAL_IMAGE_DATA_SUBDIR = 'images_masks'

if PATH_TO_FOLDER is None:
    raise ValueError('Please set PATH_TO_FOLDER to a path with unzipped training data.')

ANNDATA_PATH = 'cell_data.h5ad'
TRAIN_ANNDATA_PATH = os.path.join(TRAIN_DATA_PATH, ANNDATA_PATH)

ROI_FILES_DIR_PATH = os.path.join('CELESTA', 'data', 'exprs_data')
SIGNATURE_MATRIX_CSV_PATH = os.path.join('markers_table.csv')

In [4]:
# train_anndata = anndata.read_h5ad(TRAIN_ANNDATA_PATH)
# markers = train_anndata.var['marker']
# expressions_df = pd.DataFrame(train_anndata.layers["exprs"], columns=markers)

In [9]:
def normalize(expressions, scaling_factor=5):
    """
    Transform in CELESTA.ipynb
    
    Normalizes expression data using the arcsinh transformation.
    """
    expressions_transformed = np.arcsinh(expressions) / scaling_factor
    return expressions_transformed

class Celesta:
    def __init__(self, project_name):
        # STATIC FIELDS: remain untouched after initialization
        self.project_name: str = project_name
        self.prior_info: pd.DataFrame = None # user-defined cell-type signature matrix
        self.marker_exp_matrix: np.array = None # transformed protein marker expression
        self.original_exp: np.array = None # original protein marker expression (containing only the protein markers specified in `prior_info`)
        self.cell_ID: int = None # the IDs of the cells (from 1 to the total number of cells) # list???
        self.lineage_info: pd.DataFrame = None # the lineage information from `prior_info` parsed into round, previous cell type, and cell type number columns
        self.coords: np.array = None # the x, y coordinates of each cell
        self.cell_prob: np.array = None # cell type probability for each cell
        self.final_cell_type_assignment: np.array = None # the final cell type assignments # list???
        self.nb_list: np.array = None # list of N-nearest neighbors # list???
        self.total_rounds: int = None # the maximum round value
        self.cell_nb_in_bandwidth: Any = None # the cells located within a bandwidth to cell *c*
        self.cell_nb_dist: Any = None # the distance of each cell to cell *c* within a bandwidth
        self.initial_pri_matrix: np.array = None  # user defined cell-type marker matrix for a specific round
        self.anchor_cell_type_assignment: np.array = None # the anchor cell type assignments
        self.dist_from_nearest_assigned_cell: np.array = None # the distance from the nearest assigned cell
        self.nb_cell_type: Any = None # cell types of the neighboring cells for index cells
        # NON-STATIC FIELDS: are updated after initialization
        self.marker_exp_prob: np.array = None # the marker expression probability for each cell
        self.current_scoring_matrix: np.array = None # the current scoring matrix
        self.current_pri_matrix: np.array = None # the updated cell-type marker matrix
        self.current_cell_prob: np.array = None # the current cell probability (number_cells x number_cell_type)
        self.current_cell_type_assignment: np.array = None # the current cell type assignments (number_cells x total_rounds)
        self.starting_cell_type_assignment: np.array = None # the initial cell type assignments (number_cells x total_rounds)
        self.current_beta: np.array = None # the current beta values
        self.unassigned_cells: int = None # cells not assigned a cell type for each round and iteration
        self.assigned_cells: int = None # cells with an assigned cell type


def load_imaging_data_file(roi_file_path):
    df = pd.read_csv(roi_file_path)
    return df

def get_marker_exp_matrix(prior_marker_info,
                          imaging_data_file,
                          cofactor,
                          transform_type=1):
    """
    Gets the protein marker expressions and assigns each cell a cell ID.

    Parameters
    ==========
    prior_marker_info: pd.DataFrame
        User-defined cell-type signature matrix.
    imaging_data_file: pd.DataFrame
        Segmented imaging input file.
    cofactor: int
        The cofactor used in the arcsinh transformation.
    transform_type: int
        The transformation type used to transform the protein marker expressions. Default is 1 (arcsinh transformation).

    Returns
    =======
    cell_ids: np.array
        The cell IDs.
    original_exp: np.array
        The original protein marker expression.
    marker_exp_matrix: np.array
        The transformed protein marker expression (if transform_type is not zero).
	"""
    markers_to_use = prior_marker_info.columns[2:]
    matching_markers = [col for col in markers_to_use if col in imaging_data_file.columns]

    if len(matching_markers) != len(markers_to_use):
        raise ValueError("Please double check that the protein markers in the user-defined "
                         "cell-type signature matrix (prior_marker_info) are included in the "
                         "protein markers in the segmented imaging input file "
                         "(imaging_data_file).")
    original_exp = imaging_data_file[matching_markers].to_numpy()
    cell_ids = np.arange(1, original_exp.shape[0] + 1)

    if transform_type == 1:  # arcsinh transformation
        marker_exp_matrix = np.arcsinh(original_exp / cofactor)
        return cell_ids, original_exp, marker_exp_matrix
    
    return cell_ids, original_exp, original_exp
    
def get_prior_info(prior_marker_info):
    """
    Gets the user-defined prior knowledge matrix and cell lineage information.

    Parameters
    ==========
    prior_marker_info: pd.DataFrame
        User-defined cell-type signature matrix.

    Returns
    =======
    lineage_info: pd.DataFrame
        The lineage information parsed into round, previous cell type, and cell type number columns.
    total_rounds: int
        The maximum round value.
    """
    if not any('_' in cell for cell in prior_marker_info.iloc[:, 1]):
        raise ValueError("Warning: the lineage information column has formatting errors")
    
    round_list, previous_cell_type_list, cell_type_number_list = [], [], []
    for i in range(prior_marker_info.shape[0]):
        info = list(map(int, prior_marker_info.iloc[i, 1].split('_')))
        round_list.append(info[0])
        previous_cell_type_list.append(info[1])
        cell_type_number_list.append(info[2])
    
    lineage_info = pd.DataFrame({
        'Round': round_list,
        'Previous_cell_type': previous_cell_type_list,
        'Cell_type_number': cell_type_number_list
    })
    
    total_rounds = lineage_info['Round'].max()
    
    return lineage_info, total_rounds

def get_coords(imaging_data_file):
    """
    Gets the x, y coordinates of each cell.

    Parameters
    ==========
    imaging_data_file: pd.DataFrame
        Segmented imaging input file.

    Returns
    =======
    coords: np.array
        The x, y coordinates of each cell.
    """
    coords = imaging_data_file[['X', 'Y']].to_numpy()
    return coords
    
def fit_gmm_model(marker_name, marker_exp):
    gmm_marker_param = pd.DataFrame()

    n_components = 2

    np.random.seed(42)

    zero_indices = np.where(marker_exp == 0)[0]
    zero_percentage = len(zero_indices) / marker_exp.size

    if 0.1 < zero_percentage < 0.2:
        print("Warning: The marker expression potentially has too many zeros for fitting. GMM fitting will use input expression data with reduced sparsity")
        num_of_indices_to_remove = int(np.floor(marker_exp.size * zero_percentage))
    elif 0.2 <= zero_percentage < 0.5:
        num_of_indices_to_remove = int(
            np.floor(marker_exp.size * (zero_percentage - 0.05))
        )
    elif 0.5 <= zero_percentage <= 0.9:
        num_of_indices_to_remove = int(
            np.ceil(marker_exp.size * (zero_percentage - 0.02))
        )
    elif zero_percentage >= 0.9:
        marker_exp = marker_exp[marker_exp != 0]  # Remove all-zero rows
        num_of_indices_to_remove = 0
    else:
        num_of_indices_to_remove = 0

    if num_of_indices_to_remove > 0:
        print(
            f"Warning: The marker ({marker_name}) expression potentially has too many zeros for fitting. GMM fitting will use input expression data with reduced sparsity"
        )

        marker_exp = marker_exp.drop(marker_exp.index[zero_indices[:num_of_indices_to_remove]])


    # Fit Gaussian Mixture Model
    gmm = mixture.GaussianMixture(n_components=n_components)
    gmm.fit(marker_exp)

    # Extract GMM parameters
    proportions = gmm.weights_
    means = gmm.means_
    variances = gmm.covariances_

    # Organize parameters into a matrix
    gmm_marker_param = pd.DataFrame(
        np.vstack((proportions, means.flatten(), variances.flatten()))
    )

    return gmm_marker_param

def build_sigmod_function(marker_exp_matrix):
    """Builds the sigmoid function for the calculation of the expression probability.
    
    Parameters
    ==========
    marker_exp_matrix: np.array
        The transformed protein marker expression.
    
    Returns
    =======
    sigmoid_function_parameter: np.array
        The sigmoid function parameters.
    """
    sigmoid_function_parameter = np.zeros((2, marker_exp_matrix.shape[1]))

    for i in range(marker_exp_matrix.shape[1]):
        marker_name = marker_exp_matrix.columns[i]
        marker_exp = marker_exp_matrix[:, i]
        if not isinstance(marker_name, str):
            raise ValueError("Protein marker name in the marker expression matrix has potential problem.")
        gmm_marker_param = fit_gmm_model(marker_name, marker_exp)
        proportions, means, variances = gmm_marker_param['proportions'], gmm_marker_param['means'], gmm_marker_param['variances']

        if means[0] > means[1]:
            # The first Gaussian model is for marker expressed,
            # second is for marker not expressed
            a = (-0.5 / variances[1] + 0.5 / variances[0])
            b = means[1] / variances[1] - means[0] / variances[0]
            c = 0.5 * (-means[1]**2 / variances[1] + means[0]**2 / variances[0]) + np.log(proportions[1] / proportions[0]) + 0.5 * np.log(variances[0] / variances[1])
        else:
            # The second Gaussian model is for marker expressed,
            # first is for marker not expressed
            a = (-0.5 / variances[0] + 0.5 / variances[1])
            b = means[0] / variances[0] - means[1] / variances[1]
            c = 0.5 * (-means[0]**2 / variances[0] + means[1]**2 / variances[1]) + np.log(proportions[0] / proportions[1]) + 0.5 * np.log(variances[1] / variances[0])
        
        xroot = (-b - np.sqrt(b**2 - 4.0 * a * c)) / (2.0 * a)
        slope = 1  # Assuming slope as 1
        
        sigmoid_function_parameter[0, i] = xroot
        sigmoid_function_parameter[1, i] = slope

    return sigmoid_function_parameter

# ======================not refactored=======================================================  
def create_celesta_object(
    project_title,
    prior_marker_info,
    imaging_data_file,
    cofactor=10,
    transform_type=1,
    number_of_neighbors=5,
    bandwidth=100,
	):
    """
    CreateCelestaObject in CELESTA.ipynb
    
    Initializes the following fields of the Celesta object.
    """
    celesta_obj = Celesta(project_name=project_title)

    print("Geting protein marker expressions")

    # Get protein marker expressions and cell IDs
    cell_ids, original_exp, marker_exp_matrix = get_marker_exp_matrix(
        prior_marker_info,
        imaging_data_file,
        cofactor=cofactor,
        transform_type=transform_type,
    )
    celesta_obj.cell_ID = cell_ids
    celesta_obj.original_exp = original_exp
    celesta_obj.marker_exp_matrix = marker_exp_matrix

    ##########################################################
    ### Extremely low number of cells will cause issues in Gaussian mixture fitting
    if len(celesta_obj.cell_ID) <= 10:
        print("Warning:")
        print(f"There are only {len(cell_ids)} cells in the sample.")
        print("Extremely small number of cells will cause issues in CELESTA usage.")
        return None
    ##########################################################

    # Get user-defined prior knowledge matrix and cell lineage information
    print("Geting user-defined prior knowledge matrix")

    lineage_info, total_rounds = get_prior_info(prior_marker_info)
    celesta_obj.prior_info = prior_marker_info
    celesta_obj.lineage_info = lineage_info
    celesta_obj.total_rounds = total_rounds

    # Get coordinates
    print("Geting coordinates")

    celesta_obj.coords = get_coords(imaging_data_file)

    # Convert marker expressions to marker activation probability
    print("Converting marker expressions to marker activation probability")

    celesta_obj.marker_exp_prob = CalcMarkerActivationProbability(
        celesta_obj.marker_exp_matrix
    )

    # Get neighboring cell information
    print("Getting neighboring cell information")

    nb_list, all_cell_nb_in_bandwidth, cell_nb_dist = GetNeighborInfo(
        celesta_obj.coords, number_of_neighbors, bandwidth
    )
    celesta_obj.nb_list = nb_list
    celesta_obj.cell_nb_in_bandwidth = all_cell_nb_in_bandwidth
    celesta_obj.cell_nb_dist = cell_nb_dist

    # Initialize the matrices for scoring function and probability matrix
    print("Initializing cell and scoring matrices")

    current_cell_type_assignment, current_scoring_matrix,
    current_cell_prob = InitializeCellAndScoringMatrices(
        celesta_obj.lineage_info, celesta_obj.marker_exp_matrix, celesta_obj.prior_info
    )

    celesta_obj.current_cell_type_assignment = current_cell_type_assignment
    celesta_obj.anchor_cell_type_assignment = current_cell_type_assignment
    celesta_obj.starting_cell_type_assignment = current_cell_type_assignment
    celesta_obj.current_scoring_matrix = current_scoring_matrix
    celesta_obj.current_cell_prob = current_cell_prob

    return celesta_obj
