In [20]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt 
from sklearn.metrics.pairwise import cosine_similarity
import seaborn as sns
from scipy.stats import norm
from scipy.stats import poisson
from scipy.stats import zscore
from scipy.stats import pearsonr
from scipy.stats import spearmanr
from scipy.stats import kendalltau
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import TheilSenRegressor

import copy
import textwrap

import math
import gc

In [21]:
def two_mtx_corr_heatmap_with_pval(matrix_a, matrix_b, method='pearson', figsize=(20, 10), title=None, 
                                   xtitle = None, ytitle = None, save_addr=None):
    """
    matrix_a: First DataFrame, columns are variables, rows are observations
    matrix_b: Second DataFrame, columns are variables, rows are observations, must have the same shape as matrix_a
    method: Correlation method, options are 'pearson', 'spearman', 'kendall'
    figsize: Size of the figure
    title: Title of the plot
    save_addr: Path to save the plot (PDF format)
    """
    assert matrix_a.shape == matrix_b.shape, "The two matrices must have the same shape"

    matrix_a = pd.DataFrame(matrix_a, columns = matrix_b.columns)
    corr = pd.DataFrame(np.zeros((matrix_a.shape[1], matrix_b.shape[1])), columns=matrix_b.columns, index=matrix_a.columns)
    p_values = pd.DataFrame(np.ones((matrix_a.shape[1], matrix_b.shape[1])), columns=matrix_b.columns, index=matrix_a.columns)

    for i in range(matrix_a.shape[1]):
        for j in range(matrix_b.shape[1]):
            x = matrix_a.iloc[:, i]
            y = matrix_b.iloc[:, j]
            mask = ~np.logical_or(np.isnan(x), np.isnan(y))
            if np.sum(mask) > 0:
                if method == 'pearson':
                    corr.iloc[i, j], p_values.iloc[i, j] = pearsonr(x[mask], y[mask])
                elif method == 'kendall':
                    corr.iloc[i, j], p_values.iloc[i, j] = kendalltau(x[mask], y[mask])
                elif method == 'spearman':
                    corr.iloc[i, j], p_values.iloc[i, j] = spearmanr(x[mask], y[mask])

    fig, ax = plt.subplots(figsize=figsize)
    plt.title(title, fontsize=20)

    heatmap = sns.heatmap(corr,
                          annot=True,
                          annot_kws={"fontsize": 10},
                          fmt='.2f',
                          linewidths=0.5,
                          cmap='coolwarm',
                          ax=ax,
                          cbar=True,
                          cbar_kws={'shrink': 0.3, 'aspect': 30})

    cbar = heatmap.collections[0].colorbar
    cbar.ax.tick_params(labelsize=12)
    cbar.set_label('Correlation R value', fontsize=12)

    max_corr = np.max(corr.max())
    min_corr = np.min(corr.min())

    for i in range(p_values.shape[0]):
        for j in range(p_values.shape[1]):
            p_value = p_values.iloc[i, j]
            if not np.isnan(p_value):
                correlation_value = corr.iloc[i, j]
                text_color = 'white' if correlation_value >= (max_corr - 0.4) or correlation_value <= (min_corr + 0.4) else 'black'
                if p_value <= 0.01:
                    ax.text(j + 0.5, i + 0.8, '**', ha='center', va='center', fontsize=8, color=text_color)
                elif p_value <= 0.05:
                    ax.text(j + 0.5, i + 0.8, '*', ha='center', va='center', fontsize=8, color=text_color)

    ax.set_xticklabels([textwrap.fill(label.get_text(), 12) for label in ax.get_xticklabels()], rotation=90, ha="center")
    ax.set_yticklabels([textwrap.fill(label.get_text(), 12) for label in ax.get_yticklabels()], rotation=0, ha="right")

    for label in ax.get_xticklabels():
        label.set_fontsize(12)
    for label in ax.get_yticklabels():
        label.set_fontsize(12)

    ax.set_xlabel(xtitle, fontsize=15)
    ax.set_ylabel(ytitle, fontsize=15)

    ax.grid(False)
    plt.figtext(0.1, 0.05, '**: p<=0.01; *: p<=0.05.', fontsize=15)

    if save_addr:
        plt.savefig(save_addr, format='pdf')
    plt.close()

In [22]:
def corr_heatmap_with_pval(df, method = 'pearson', figsize=(20, 10), title=None, xtitle = None, ytitle = None, save_addr = None):
  """
  df: dataframe to be used. Ensured the dataframe has been sliced to contain only the column you need. It accepts only numerical columns
  method: default uses the pearson method. It overall permits 3 methods; 'pearson', 'spearman' and 'kendall'
  figsize: default is (20, 10) but you can change it based on your preference
  title: Specify the title for your chart, default is None
  """
  # Make a copy of the df
  data = df.copy()
  # Check features correlation
  corr = data.corr(method = method)

  # Create a mask to hide the upper triangle
  mask = np.zeros_like(corr, dtype=bool)
  mask[np.triu_indices_from(mask)] = True

  # Set the diagonal elements of the mask to False to display self-correlation
  np.fill_diagonal(mask, False)

  fig, ax = plt.subplots(figsize=figsize)
  plt.title(title, fontsize=20)

  # Create the heatmap with the custom mask
  heatmap = sns.heatmap(corr,
                        annot=True,
                        annot_kws={"fontsize": 10},  # Adjust annotation font size
                        fmt='.2f',
                        linewidths=0.5,
                        cmap='coolwarm',
                        mask=mask,
                        ax=ax,
                        vmax=1,
                        vmin=-1,
                        cbar=True,
                        cbar_kws={'shrink': 0.3, 'aspect': 30})
    
  cbar = heatmap.collections[0].colorbar
  cbar.ax.tick_params(labelsize=12)
  cbar.set_label('Correlation R valUe', fontsize=12)

  # Create a function to calculate and format p-values
  p_values = np.full((corr.shape[0], corr.shape[1]), np.nan)
  for i in range(corr.shape[0]):
    for j in range(i+1, corr.shape[1]):
      x = data.iloc[:, i]
      y = data.iloc[:, j]
      mask = ~np.logical_or(np.isnan(x), np.isnan(y))
      if np.sum(mask) > 0:
        if method == 'pearson':
          p_values[i, j] = pearsonr(x[mask], y[mask])[1] #Changes based on the method chosen in the function
        elif method == 'kendall':
          p_values[i, j] = kendalltau(x[mask], y[mask])[1]
        elif method == 'spearman':
          p_values[i, j] = spearmanr(x[mask], y[mask])[1]
  
  p_values = pd.DataFrame(p_values, columns=corr.columns, index=corr.index)

  # Create a mask for the p-values heatmap
  mask_pvalues = np.triu(np.ones_like(p_values), k=1)

  # Calculate the highest and lowest correlation coefficients
  max_corr = np.max(corr.max())
  min_corr = np.min(corr.min())
  
  # Annotate the heatmap with p-values and change text color based on correlation value
  for i in range(p_values.shape[0]):
    for j in range(p_values.shape[1]):
      if mask_pvalues[i, j]:
        p_value = p_values.iloc[i, j]
        if not np.isnan(p_value):
          correlation_value = corr.iloc[i, j]
          text_color = 'white' if correlation_value >= (max_corr - 0.4) or correlation_value <= (min_corr + 0.4) else 'black'
          if p_value <= 0.01:
            #include double asterisks for p-value <= 0.01
            ax.text(i + 0.5, j + 0.8, f'**',
                    horizontalalignment='center',
                    verticalalignment='center',
                    fontsize=8,
                    color=text_color)
          elif p_value <= 0.05:
            #include single asterisks for p-value <= 0.05
            ax.text(i + 0.5, j + 0.8, f'*',
                    horizontalalignment='center',
                    verticalalignment='center',
                    fontsize=8,
                    color=text_color)
          else:
            ax.text(i + 0.5, j + 0.8, f'',
                    horizontalalignment='center',
                    verticalalignment='center',
                    fontsize=8,
                    color=text_color)

  # Customize x-axis labels
  x_labels = [textwrap.fill(label.get_text(), 12) for label in ax.get_xticklabels()]
  ax.set_xticklabels(x_labels, rotation=90, ha="center")

  # Customize y-axis labels
  y_labels = [textwrap.fill(label.get_text(), 12) for label in ax.get_yticklabels()]
  ax.set_yticklabels(y_labels, rotation=0, ha="right")

  for label in ax.get_xticklabels():
    label.set_fontsize(12)

  for label in ax.get_yticklabels():
      label.set_fontsize(12)
  
  ax.set_xlabel(xtitle, fontsize=15)
  ax.set_ylabel(ytitle, fontsize=15)

  ax.grid(False)
  # Add a footnote below and to the right side of the chart
  plt.figtext(0.1,0.05,
              '**: p<=0.01; *: p<=0.05.',
              fontsize=15)
  plt.savefig(save_addr, format='pdf')
  plt.close()
  #plt.show()

In [23]:
def plot_pack(file,residual_mtx,data, selected_channels, Output_dir,fit_method = "norm", B = None):

    #E1: distribution estimation and hist plots
    

    col_num = 6
    row_num = math.ceil(len(selected_channels) / col_num)
    fig, axs = plt.subplots(row_num, col_num, figsize=(24,4*row_num))
    fig_count = 0
    n_row = 0
    n_col = 0

    if (fit_method == "norm"):
        est_pd = pd.DataFrame(columns=data.columns,index=['mean','std'])
        for i in range(data.shape[1]):
            x = residual_mtx[:,i]
            
            threshold = np.percentile(x, 90)
            x = x[x <= threshold]

            mean, std = norm.fit(x)
            est_pd.iloc[0,i] = mean
            est_pd.iloc[1,i] = std

            if n_col >= col_num:
                n_col = 0
                n_row += 1
            
            axs[n_row,n_col].hist(x, bins=60, density=True, alpha=0.6, color='skyblue', edgecolor='black')
            xmin, xmax = axs[n_row,n_col].get_xlim()
            x_fit = np.linspace(xmin, xmax, 100)
            p_fit = norm.pdf(x_fit, mean, std)
            axs[n_row,n_col].plot(x_fit, p_fit, 'r', linewidth=2)

            axs[n_row,n_col].set_title(str(data.columns[i])+" ("+fit_method+")")
            axs[n_row,n_col].set_xlabel("Signal")
            axs[n_row,n_col].set_ylabel("Prob")
            axs[n_row,n_col].grid(True)

            n_col += 1
    elif (fit_method == "poisson"):
        est_pd = pd.DataFrame(columns=data.columns, index=['lambda'])
        for i in range(data.shape[1]):
            x = residual_mtx[:,i]
            lambda_hat = np.mean(x)
            est_pd.iloc[0,i] = lambda_hat

            if n_col >= col_num:
                n_col = 0
                n_row += 1
            
            axs[n_row,n_col].hist(x, bins=30, density=True, alpha=0.6, color='skyblue', edgecolor='black')
            xmin, xmax = axs[n_row,n_col].get_xlim()
            x_fit = np.arange(int(xmin), int(xmax))
            p_fit = poisson.pmf(x_fit, lambda_hat)
            axs[n_row,n_col].plot(x_fit, p_fit, 'r', linewidth=2)

            axs[n_row,n_col].set_title(str(data.columns[i])+" ("+fit_method+")")
            axs[n_row,n_col].set_xlabel("Signal")
            axs[n_row,n_col].set_ylabel("Prob")
            axs[n_row,n_col].grid(True)

            n_col += 1

    plt.tight_layout()
    plt.savefig(Output_dir+"/residual_distribution_across_channels_"+file+".pdf", format='pdf')  
    plt.close()




    #E1: plot row lineplots of residual
    data_for_line = copy.deepcopy(residual_mtx)
    data_for_line = pd.DataFrame(data_for_line,columns=selected_channels)
    data_for_line =data_for_line.transpose()
    data_for_line.insert(0, "channel", pd.Series(selected_channels.tolist(),index=selected_channels))

    columns = data_for_line.columns
    title = file
    # set figure size
    my_dpi=96
    plt.figure(figsize=(2500/my_dpi, 1500/my_dpi), dpi=my_dpi)

    for column in columns[1:]:
        plt.plot(data_for_line[column], label = str(column))
    plt.title(title, loc='left', fontsize=12, fontweight=0)
    plt.xticks(rotation = 90,fontsize = 10)
    plt.xlabel("Channels",fontsize = 10)
    plt.ylabel("Signal",fontsize = 10)
    plt.ylim(-100,100)
    #plt.show()

    plt.savefig(Output_dir+"/lineplot_residual_"+title+".pdf", format='pdf')  
    plt.close()

    #E1: plot correlation between residuals of channels
    data_for_simi = copy.deepcopy(residual_mtx)
    corr_heatmap_with_pval(pd.DataFrame(data_for_simi,columns=selected_channels),figsize=(40,30),
                           title='Correlation matrix between residuals of channels',
                           xtitle = "Residual", ytitle = "Residual",
                        save_addr = Output_dir+"/cor_matrix_residual_vs_res_"+title+".pdf")
                        
    #E1:
    two_mtx_corr_heatmap_with_pval(matrix_a=residual_mtx, matrix_b = data, figsize=(40, 30), 
                                   title='Correlation matrix between residuals and raw of channels', 
                                   xtitle = "Residual", ytitle = "Raw",
                                   save_addr = Output_dir+"/cor_matrix_residual_vs_raw_"+title+".pdf")

    #E1: plot similarity matrix heatmap between residuals of channels (redundant)
    title = file

    data_for_simi = copy.deepcopy(residual_mtx)
    cos_sim = cosine_similarity(data_for_simi.transpose())

    df = pd.DataFrame(cos_sim,index=selected_channels,columns=selected_channels)
        
    #plt.figure(figsize=(fig_weith, fig_height))
    sns.set_theme(style="ticks",font_scale=0.5)
    p = sns.clustermap(df,cmap=sns.color_palette("blend:#ffffff,#365E32", as_cmap=True), annot=True, fmt=".2f",cbar_pos=[0.04, 0.04, 0.02, 0.1],figsize=(16, 16))
    p.ax_cbar.set_title('Cosine Similarity')
    p.ax_heatmap.set_xlabel('Channels')
    p.ax_heatmap.set_ylabel('Channels')
    p.ax_heatmap.set_title('Similarity matrix for signal deviation \n Signal-median(Signal) \n '+ title, weight="bold", fontsize=10) 
    p.ax_col_dendrogram.set_visible(False)
    plt.savefig(Output_dir+"/cos_matrix_residual_"+title+".pdf", format='pdf')
    plt.close()

    
    #E1: scatter plot between B and channel residuals
    if B is not None:
        col_num = 6
        row_num = math.ceil(len(selected_channels) / col_num)

        plot_ptx = residual_mtx

        threshold = np.percentile(np.abs(B), 90)
        mask = np.abs(B) <= threshold # shape: (1, 1000)


        fig, axs = plt.subplots(row_num, col_num, figsize=(24, 4*row_num))

        for i in range(plot_ptx.shape[1]):
            row = i // col_num
            col = i % col_num
            
            x_data = B[mask].flatten()
            y_data = plot_ptx[mask.flatten(), i]
            
            R, P = pearsonr(x_data, y_data)
            significance = "**" if P < 0.01 else "*" if P < 0.05 else ""

            axs[row, col].scatter(x_data, y_data, s=10, alpha=0.7)
            axs[row, col].set_title(f"{selected_channels[i]} (R={R:.2f}, P={P:.2e}{significance})")
            axs[row, col].set_xlabel("Coef of fluor (count)")
            axs[row, col].set_ylabel("Residual of " + selected_channels[i])
            #axs[row, col].set_xscale('symlog')
            #axs[row, col].set_yscale('symlog')
            
            model = LinearRegression().fit(x_data.reshape(-1, 1), y_data)
            y_fit = model.predict(x_data.reshape(-1, 1))
            axs[row, col].plot(x_data, y_fit, color='red', linewidth=1)


        plt.tight_layout()
        plt.savefig(Output_dir+"/scatter_B_vs_residuals_"+file+".pdf", format='pdf')  
        plt.close()

    #E1: scatter plot between B and channel raw
    if B is not None:
        col_num = 6
        row_num = math.ceil(len(selected_channels) / col_num)

        plot_ptx = np.array(data)

        threshold = np.percentile(np.abs(B), 90)
        mask = np.abs(B) <= threshold # shape: (1, 1000)


        fig, axs = plt.subplots(row_num, col_num, figsize=(24, 4*row_num))

        for i in range(plot_ptx.shape[1]):
            row = i // col_num
            col = i % col_num
            
            x_data = B[mask].flatten()
            y_data = plot_ptx[mask.flatten(), i]
            
            R, P = pearsonr(x_data, y_data)
            significance = "**" if P < 0.01 else "*" if P < 0.05 else ""

            axs[row, col].scatter(x_data, y_data, s=10, alpha=0.7)
            axs[row, col].set_title(f"{selected_channels[i]} (R={R:.2f}, P={P:.2e}{significance})")
            axs[row, col].set_xlabel("Coef of fluor (count)")
            axs[row, col].set_ylabel("Residual of " + selected_channels[i])
            #axs[row, col].set_xscale('symlog')
            #axs[row, col].set_yscale('symlog')
            
            model = LinearRegression().fit(x_data.reshape(-1, 1), y_data)
            y_fit = model.predict(x_data.reshape(-1, 1))
            axs[row, col].plot(x_data, y_fit, color='red', linewidth=1)


        plt.tight_layout()
        plt.savefig(Output_dir+"/scatter_B_vs_raw_"+file+".pdf", format='pdf')  
        plt.close()

    
    

    return est_pd

def calculate_sigma_Y(sigma_Z, sigma_X):
    if sigma_Z < sigma_X:
        return 0
    else:
        sigma_Y = math.sqrt(sigma_Z**2 - sigma_X**2)
        return sigma_Y

def mkdir_silent(directory_name):
    try:
        os.mkdir(directory_name)
        print(f"Directory '{directory_name}' created successfully.")
    except FileExistsError:
        print(f"Directory '{directory_name}' already exists.")
    except PermissionError:
        print(f"Permission denied: Unable to create '{directory_name}'.")
    except Exception as e:
        print(f"An error occurred: {e}")


def signature_lineplot(sig,sig_name,Output_dir):
    
    f = plt.figure()
    f.set_figwidth(8)
    f.set_figheight(4)
    plt.plot(sig)
    plt.title(sig_name)
    plt.xticks(rotation = 90)
    plt.tight_layout()
    #plt.show()
    
    plt.savefig(Output_dir+"/Normalized_Sig_lineplot_"+sig_name+".pdf", format='pdf')  
    plt.close()


In [24]:
def B_bin_plot_pack(file, B, residual_mtx, selected_channels, Output_dir, bin_num=20,count_thre = 20,
                    filter_low=1,filter_high=99.9,filter_aroundzero=0.2, do_plot = False):

    # step 1 remove outlier 
    print("residual_mtx")
    print(residual_mtx.shape)
    # remove higher outlier
    threshold_high = np.percentile(B, filter_high)
    mask_high = B < threshold_high
    B_filtered_high = B[mask_high]
    rows_to_keep_high = np.where(mask_high[0])[0]
    residual_mtx_filtered_high = residual_mtx[rows_to_keep_high, :]
    print("residual_mtx_filtered_high")
    print(residual_mtx_filtered_high.shape)
    # remove lower outlier
    threshold_low = np.percentile(B, filter_low)
    mask_low = B_filtered_high > threshold_low
    B_filtered_low = B_filtered_high[mask_low]
    rows_to_keep_low = np.where(mask_low)[0]
    residual_mtx_filtered_low = residual_mtx_filtered_high[rows_to_keep_low, :]
    print("residual_mtx_filtered_low")
    print(residual_mtx_filtered_low.shape)
    #remove around zero value
    threshold_aroundzero = (1 - filter_aroundzero)* B_filtered_low.min() + filter_aroundzero * B_filtered_low.max()
    mask_aroundzero = B_filtered_low > threshold_aroundzero
    B_filtered = B_filtered_low[mask_aroundzero]
    rows_to_keep_aroundzero = np.where(mask_aroundzero)[0]
    residual_mtx_filtered = residual_mtx_filtered_low[rows_to_keep_aroundzero, :]
    print("residual_mtx_filtered")
    print(residual_mtx_filtered.shape)
    # step 2 calcualte bin size 
    bin_size = (B_filtered.max() - B_filtered.min()) / bin_num

    # step 3 initail save matrix
    mean_std_matrix = np.zeros((3, len(selected_channels), bin_num))  # 0: mean, 1: std, 2: bin midpoint
    cov_matrices = np.zeros((len(selected_channels), len(selected_channels), bin_num)) 


    
    # step 4 loop for bins
    for i in range(bin_num):
        bin_min = B_filtered.min() + i * bin_size - 1 * bin_size
        bin_max = bin_min + bin_size + 1 * bin_size
        bin_mid = (bin_min + bin_max) / 2

        bin_mask = (B_filtered >= bin_min) & (B_filtered < bin_max)
        bin_points = B_filtered[bin_mask]
        #print(f"point{i}: {bin_points.size}")

        if bin_points.size > count_thre:
            bin_indices = np.where(bin_mask)[0]
            residuals_in_bin = residual_mtx_filtered[bin_indices, :]

            means = []
            stds = []
            for col in range(residuals_in_bin.shape[1]):
                mean, std = robust_stats_zscore(residuals_in_bin[:, col])
                means.append(mean)
                stds.append(std)
            means = np.array(means)
            stds = np.array(stds)

            #print(f"stds[5]: {stds[5]}")
            mean_std_matrix[0, :, i] = means
            mean_std_matrix[1, :, i] = stds

            cov_matrix = robust_covariance_matrix(residuals_in_bin)
            cov_matrices[:, :, i] = cov_matrix

        mean_std_matrix[2, :, i] = bin_mid


    # step 5 scatter plot B vs mean
    col_num = 6
    row_num = math.ceil(len(selected_channels) / col_num)

    
    intercepts_B_vs_mean = np.zeros((1, len(selected_channels)))
    slopes_B_vs_mean = np.zeros((1, len(selected_channels)))

    fig, axs = plt.subplots(row_num, col_num, figsize=(24, 4*row_num))

    for i in range(mean_std_matrix.shape[1]):
        row = i // col_num
        col = i % col_num

        x_data = mean_std_matrix[2, i, :]#bin
        y_data = mean_std_matrix[0, i, :]#mean

        valid_mask = (mean_std_matrix[0, i, :] != 0) & (mean_std_matrix[1, i, :] != 0)#mean and std != 0
        x_data = x_data[valid_mask]
        y_data = y_data[valid_mask]

        if x_data.size > 3 and y_data.size > 3:
            R, P = pearsonr(x_data, y_data)
            significance = "**" if P < 0.01 else "*" if P < 0.05 else ""

            axs[row, col].scatter(x_data, y_data, s=10, alpha=0.7)
            axs[row, col].set_title(f"{selected_channels[i]} (R={R:.2f}, P={P:.2e}{significance})")
            axs[row, col].set_xlabel("B (bin midpoint)")
            axs[row, col].set_ylabel("Mean of " + selected_channels[i])

            #model = LinearRegression().fit(x_data.reshape(-1, 1), y_data)
            model = TheilSenRegressor().fit(x_data.reshape(-1, 1), y_data)

            y_fit = model.predict(x_data.reshape(-1, 1))
            axs[row, col].plot(x_data, y_fit, color='red', linewidth=1)

            intercepts_B_vs_mean[0, i] = model.intercept_
            slopes_B_vs_mean[0, i] = model.coef_[0]
    #if do_plot:
    plt.tight_layout()
    plt.savefig(Output_dir+"/scatter_across_channels_Bbins_vs_mean_"+file+".pdf", format='pdf')
    plt.close()

    # step 6 scatter plot B vs std
    col_num = 6
    row_num = math.ceil(len(selected_channels) / col_num)

    intercepts_B_vs_std = np.zeros((1, len(selected_channels)))
    slopes_B_vs_std = np.zeros((1, len(selected_channels)))

    fig, axs = plt.subplots(row_num, col_num, figsize=(24, 4*row_num))

    for i in range(mean_std_matrix.shape[1]):
        row = i // col_num
        col = i % col_num

        x_data = mean_std_matrix[2, i, :]#bin
        y_data = mean_std_matrix[1, i, :]#std

        valid_mask = (mean_std_matrix[0, i, :] != 0) & (mean_std_matrix[1, i, :] != 0)#mean and std != 0
        x_data = x_data[valid_mask]
        y_data = y_data[valid_mask]

        if x_data.size > 3 and y_data.size > 3:
            R, P = pearsonr(x_data, y_data)
            significance = "**" if P < 0.01 else "*" if P < 0.05 else ""

            axs[row, col].scatter(x_data, y_data, s=10, alpha=0.7)
            axs[row, col].set_title(f"{selected_channels[i]} (R={R:.2f}, P={P:.2e}{significance})")
            axs[row, col].set_xlabel("B (bin midpoint)")
            axs[row, col].set_ylabel("Std of " + selected_channels[i])

            #model = LinearRegression().fit(x_data.reshape(-1, 1), y_data)
            model = TheilSenRegressor().fit(x_data.reshape(-1, 1), y_data)
            y_fit = model.predict(x_data.reshape(-1, 1))
            axs[row, col].plot(x_data, y_fit, color='red', linewidth=1)

            intercepts_B_vs_std[0, i] = model.intercept_
            slopes_B_vs_std[0, i] = model.coef_[0]

    #if do_plot:
    plt.tight_layout()
    plt.savefig(Output_dir+"/scatter_across_channels_Bbins_vs_std_"+file+".pdf", format='pdf')
    plt.close()

    #step 7 scatter plot B vs cov
    
    intercepts_B_vs_cov = np.zeros((len(selected_channels), len(selected_channels), 1))
    slopes_B_vs_cov = np.zeros((len(selected_channels), len(selected_channels), 1))

    for j in range(cov_matrices.shape[0]):#

        col_num = 6
        row_num = math.ceil(len(selected_channels) / col_num)
        if do_plot:
            fig, axs = plt.subplots(row_num, col_num, figsize=(24, 4*row_num))

        for i in range(cov_matrices.shape[1]):
            row = i // col_num
            col = i % col_num

            x_data = mean_std_matrix[2, i, :]#bin
            y_data = cov_matrices[j, i, :]#cov

            valid_mask = (mean_std_matrix[0, i, :] != 0) & (mean_std_matrix[1, i, :] != 0)#mean and std != 0
            x_data = x_data[valid_mask]
            y_data = y_data[valid_mask]

            if x_data.size > 3 and y_data.size > 3:
                R, P = pearsonr(x_data, y_data)
                significance = "**" if P < 0.01 else "*" if P < 0.05 else ""

                if do_plot:
                    axs[row, col].scatter(x_data, y_data, s=10, alpha=0.7)
                    axs[row, col].set_title(f"Cov: {selected_channels[i]}\n vs {selected_channels[j]} \n(R={R:.2f}, P={P:.2e}{significance})")

                    axs[row, col].set_xlabel("B (bin midpoint)")
                    axs[row, col].set_ylabel("Cov")

                #model = LinearRegression().fit(x_data.reshape(-1, 1), y_data)
                model = TheilSenRegressor().fit(x_data.reshape(-1, 1), y_data)
                y_fit = model.predict(x_data.reshape(-1, 1))
                if do_plot:
                    axs[row, col].plot(x_data, y_fit, color='red', linewidth=1)

                intercepts_B_vs_cov[j, i, 0] = model.intercept_
                slopes_B_vs_cov[j, i, 0] = model.coef_[0]
        
        if do_plot:
            plt.tight_layout()
            plt.savefig(Output_dir+"/cov_"+str(j)+".pdf", format='pdf')
            plt.close()

    
    return mean_std_matrix, cov_matrices, intercepts_B_vs_mean, slopes_B_vs_mean, intercepts_B_vs_std, slopes_B_vs_std, intercepts_B_vs_cov, slopes_B_vs_cov

In [None]:
def para_from_data(data):
    """
    Compute the robust mean and standard deviation for each column of the data,
    and return the covariance matrix.

    Parameters
    ----------
    data : 2D array-like
        Input two-dimensional numeric array, where each column represents a feature.

    Returns
    -------
    mean_std_matrix : 2 x N array
        The first row contains the means, and the second row contains the standard deviations.
    cov_matrices : N x N array
        The covariance matrix.
    """

    data = np.array(data)
    num_features = data.shape[1]
    mean_std_matrix = np.zeros((2, num_features))  # 0: mean, 1: std
    for i in range(num_features):
        mean, std = robust_stats_zscore(data[:, i], threshold=3.0)
        mean_std_matrix[0, i] = mean
        mean_std_matrix[1, i] = std

    cov_matrix = robust_covariance_matrix(data, z_thresh=3.0)
    return mean_std_matrix, cov_matrix


def robust_stats_zscore(data, threshold=3.0, verbose=False):
    """
    Identify and remove outliers using the Z-score method, then return the mean and
    standard deviation of the non-outlier values.

    Parameters
    ----------
    data : array-like
        Input one-dimensional numeric array.
    threshold : float
        Z-score threshold; elements exceeding this value are considered outliers.
    verbose : bool
        Whether to print intermediate computation results.

    Returns
    -------
    mean : float
        Mean of the non-outlier values.
    std : float
        Standard deviation of the non-outlier values.
    """

    data = np.array(data)
    z_scores = zscore(data)
    mask = np.abs(z_scores) < threshold
    filtered_data = data[mask]

    if verbose:
        print(f"Z-scores: {z_scores}")
        print(f"outlier threshold: Â±{threshold}")
        print(f"non-outlier data: {filtered_data}")

    mean = filtered_data.mean()
    std = filtered_data.std()
    return mean, std

def robust_covariance_matrix(data, z_thresh=3.0):
    """
    Compute the covariance matrix after removing outliers.

    Parameters
    ----------
    data : ndarray
        Input 2D array with shape (n_samples, n_features).
    z_thresh : float
        Z-score threshold; elements exceeding this value are treated as outliers.

    Returns
    -------
    cov_matrix : ndarray
        Covariance matrix computed after excluding outliers, with shape
        (n_features, n_features).
    """

    data = np.array(data)
    n_samples, n_features = data.shape
    cleaned_data = []

    for col in range(n_features):
        col_data = data[:, col]
        z_scores = zscore(col_data)
        mask = np.abs(z_scores) < z_thresh
        # Replace outliers with NaN
        cleaned_col = np.where(mask, col_data, np.nan)
        cleaned_data.append(cleaned_col)

    # Transpose back to the original shape (n_samples, n_features)
    cleaned_data = np.array(cleaned_data).T

    # Fill NaN values with the column mean
    col_means = np.nanmean(cleaned_data, axis=0)
    filled_data = np.where(np.isnan(cleaned_data), col_means, cleaned_data)

    # Compute the covariance matrix
    cov_matrix = np.cov(filled_data, rowvar=False)
    return cov_matrix


In [None]:
#Set Data dir and Output dir
Data_dir = 'E:/ResidualModel/python/Output_01_fcs_subset_xenith/beads/' #change Data_dir for each dataset
#Data_dir = 'E:/ResidualModel/python/Output_01_fcs_subset_xenith/cells/'
#Data_dir = 'E:/ResidualModel/python/Output_01_fcs_subset_xenith/external/'
#Data_dir = 'E:/ResidualModel/python/Output_01_fcs_subset_Aurora5L/'
Output_dir = 'E:/ResidualModel/python/Output_02_calculate_parameters_pinv/Xenith/beads/' #change Output_dir accordingly
#Output_dir = 'E:/ResidualModel/python/Output_02_calculate_parameters_pinv/Xenith/cells/'
#Output_dir = 'E:/ResidualModel/python/Output_02_calculate_parameters_pinv/Xenith/external/'
#Output_dir = 'E:/ResidualModel/python/Output_02_calculate_parameters_pinv/Aurora5L/'
Data_list =  os.listdir(Data_dir)
insta = "Xenith" #Xenith or Aurora5L #change the instumental accordingly

In [None]:
pos_files = [f for f in Data_list if f.endswith('pos.fcs.pkl')]

cleaned_names = [f.removesuffix('_pos.fcs.pkl') for f in pos_files]
#cleaned_names

In [None]:
skip = False
do_plot = True
# do plot will cause much more time. Few hours for the whole dataset.

for f in cleaned_names:#cleaned_names[0:31] 62
    file_neg = f + "_neg.fcs.pkl"
    file_pos = f + "_pos.fcs.pkl"
    #file_sample = f + "_sample.fcs.pkl"
    file_sample = f + "_filtered_sample.fcs.pkl"# for [3, 14, 16, 22, 30]
    file_neg_filtered = f + "_filtered_neg.fcs.pkl"
    #file_neg_filtered = file_neg
    print(f)

    #get neg signature
    path = Data_dir+"/"+file_neg
    data_neg = pd.read_pickle(path)  

    if insta == "Xenith":
        selected_channels = data_neg.columns[19:70]
    elif insta == "Aurora5L":
        selected_channels = data_neg.columns[list(range(1,17))+list(range(19,35))+list(range(39,71))]

    data_neg = data_neg[selected_channels]
    data_neg = data_neg.reset_index(drop=True)
    neg_sig = data_neg.median()

    #get pos signature
    path = Data_dir+"/"+file_pos
    data_pos = pd.read_pickle(path)  

    if insta == "Xenith":
        selected_channels = data_pos.columns[19:70]
    elif insta == "Aurora5L":
        selected_channels = data_pos.columns[list(range(1,17))+list(range(19,35))+list(range(39,71))]

    data_pos = data_pos[selected_channels]
    data_pos = data_pos.reset_index(drop=True)
    pos_sig = data_pos.median()

    #get sig
    sig = pos_sig - neg_sig
    sig = sig / max(sig)
    AF_sig = neg_sig / max(neg_sig)

    #get data_sample
    path = Data_dir+"/"+file_sample
    data_sample = pd.read_pickle(path)  

    if insta == "Xenith":
        selected_channels = data_sample.columns[19:70]
    elif insta == "Aurora5L":
        selected_channels = data_sample.columns[list(range(1,17))+list(range(19,35))+list(range(39,71))]

    data_sample = data_sample[selected_channels]
    data_sample = data_sample.reset_index(drop=True)

    #get neg filtered signature
    path = Data_dir+"/"+file_neg_filtered
    data_neg_filtered = pd.read_pickle(path)  

    if insta == "Xenith":
        selected_channels = data_neg_filtered.columns[19:70]
    elif insta == "Aurora5L":
        selected_channels = data_neg_filtered.columns[list(range(1,17))+list(range(19,35))+list(range(39,71))]

    data_neg_filtered = data_neg_filtered[selected_channels]
    data_neg_filtered = data_neg_filtered.reset_index(drop=True)

    A_df = pd.concat([neg_sig], axis=1)
    A_df.columns = ['AF']
    A_pinv = np.linalg.pinv(A_df.values)
    unmixed_data = np.dot(A_pinv,np.array(data_neg_filtered).transpose())
    explained_data_neg = np.dot(A_df,unmixed_data)
    explained_data_neg = np.array(explained_data_neg).transpose()
    res_data_neg = data_neg_filtered - explained_data_neg

    mean_std_matrix_neg, cov_matrices_neg = para_from_data(res_data_neg)

    #fit curve
    A_df = pd.concat([sig,neg_sig], axis=1)
    A_df.columns = ['target','AF']
    A_pinv = np.linalg.pinv(A_df.values)
    B_pos = np.dot(A_pinv,np.array(data_pos).transpose())
    coef_mtx_pos = np.dot(A_df, B_pos).transpose()
    residual_mtx_pos = np.array(data_pos) - coef_mtx_pos

    B = np.dot(A_pinv,np.array(data_sample).transpose())
    coef_mtx = np.dot(A_df, B).transpose()
    residual_mtx = np.array(data_sample) - coef_mtx

    A_df = pd.concat([neg_sig], axis=1)
    A_df.columns = ['AF']
    A_pinv = np.linalg.pinv(A_df.values)
    B_neg = np.dot(A_pinv,np.array(data_neg).transpose())
    coef_mtx_neg = np.dot(A_df, B_neg).transpose()
    residual_mtx_neg = np.array(data_neg) - coef_mtx_neg

    tmp_output_dir = Output_dir+"/"+f
    mkdir_silent(tmp_output_dir)
    signature_lineplot(sig,sig_name = f,Output_dir = tmp_output_dir)
    signature_lineplot(neg_sig / max(neg_sig),sig_name = f + "_AF",Output_dir = tmp_output_dir)

    #E1: estimate residual distribution for each channel (positive points (AF + Fluor))
    if not skip:
        est_pd_pos = plot_pack(file= file_pos,residual_mtx = residual_mtx_pos, data = data_pos, 
                            selected_channels=selected_channels, Output_dir = tmp_output_dir,
                            fit_method = "norm",B = np.array(B_pos[0:1,:]))



    #E2: estimate residual distribution for each channel (negative points (AF))
    if not skip:
        est_pd_neg = plot_pack(file= file_neg,residual_mtx = residual_mtx_neg, data = data_neg, 
                        selected_channels=selected_channels, Output_dir = tmp_output_dir,
                        fit_method = "norm",B = np.array(B_neg[0:1,:]))#poisson will be more difficult and not practicle, but definitily possible for calculation


    gc.collect()

    #E3: estimate residual distribution for each channel (Fluor)
    if not skip:
        est_pd_fluor = est_pd_pos - est_pd_neg
        for i in range(len(selected_channels)):
            est_pd_fluor.iloc[1,i] = calculate_sigma_Y(sigma_Z = est_pd_pos.iloc[1,i], sigma_X = est_pd_neg.iloc[1,i])

    #E4: estimate residual distribution for each channel from all sample (pos + neg)(positive points (AF + Fluor))
        
    if f == 'SCC_Cell_SB600_CD244':
        mean_std_matrix, cov_matrices, intercepts_B_vs_mean, slopes_B_vs_mean, intercepts_B_vs_std, slopes_B_vs_std, intercepts_B_vs_cov, slopes_B_vs_cov = B_bin_plot_pack(file= file_pos, B= np.array(B[0:1,:]), residual_mtx = residual_mtx, 
                                                selected_channels=selected_channels, Output_dir= tmp_output_dir, 
                                                bin_num=10,count_thre = 10,
                                                filter_low=0.1,filter_high=95,filter_aroundzero=0.001, do_plot = do_plot)
    elif f == 'SCC_Bead_NFR700_CD4':
        mean_std_matrix, cov_matrices, intercepts_B_vs_mean, slopes_B_vs_mean, intercepts_B_vs_std, slopes_B_vs_std, intercepts_B_vs_cov, slopes_B_vs_cov = B_bin_plot_pack(file= file_pos, B= np.array(B[0:1,:]), residual_mtx = residual_mtx, 
                                                selected_channels=selected_channels, Output_dir= tmp_output_dir, 
                                                bin_num=50,count_thre = 10,
                                                filter_low=0.1,filter_high=99,filter_aroundzero=0.01, do_plot = do_plot)
    else: 
        mean_std_matrix, cov_matrices, intercepts_B_vs_mean, slopes_B_vs_mean, intercepts_B_vs_std, slopes_B_vs_std, intercepts_B_vs_cov, slopes_B_vs_cov = B_bin_plot_pack(file= file_pos, B= np.array(B[0:1,:]), residual_mtx = residual_mtx, 
                                            selected_channels=selected_channels, Output_dir= tmp_output_dir, 
                                            bin_num=30,count_thre = 10,
                                            filter_low=0.1,filter_high=99,filter_aroundzero=0.01, do_plot = do_plot)

    #save
    if not skip:
        est_pd_pos.to_csv(tmp_output_dir + '/est_pd_pos.csv', index=True)
        est_pd_neg.to_csv(tmp_output_dir + '/est_pd_neg.csv', index=True)
        est_pd_fluor.to_csv(tmp_output_dir + '/est_pd_fluor.csv', index=True)
    np.save(tmp_output_dir+"/mean_std_matrix.npy", mean_std_matrix)
    np.save(tmp_output_dir+"/cov_matrices.npy", cov_matrices)
    np.save(tmp_output_dir+"/intercepts_B_vs_mean.npy", intercepts_B_vs_mean)
    np.save(tmp_output_dir+"/slopes_B_vs_mean.npy", slopes_B_vs_mean)
    np.save(tmp_output_dir+"/intercepts_B_vs_std.npy", intercepts_B_vs_std)
    np.save(tmp_output_dir+"/slopes_B_vs_std.npy", slopes_B_vs_std)
    np.save(tmp_output_dir+"/intercepts_B_vs_cov.npy", intercepts_B_vs_cov)
    np.save(tmp_output_dir+"/slopes_B_vs_cov.npy", slopes_B_vs_cov)

    sig.to_csv(tmp_output_dir + '/sig.csv', index=True)
    neg_sig.to_csv(tmp_output_dir + '/neg_sig.csv', index=True)
    pos_sig.to_csv(tmp_output_dir + '/pos_sig.csv', index=True)
    np.save(tmp_output_dir+"/mean_std_matrix_neg.npy", mean_std_matrix_neg)
    np.save(tmp_output_dir+"/cov_matrices_neg.npy", cov_matrices_neg)
    gc.collect()



SCC_Cell_PE_KIRDL1
Directory 'E:/ResidualModel/extra_17_flow_channel_cor_spectrum/Output/SCC_Cell_PE_KIRDL1' created successfully.
residual_mtx
(8674, 51)
residual_mtx_filtered_high
(8587, 51)
residual_mtx_filtered_low
(8578, 51)
residual_mtx_filtered
(8454, 51)


In [None]:
#this function will draw some plots to estimate the scc files, which is not necessary for results estimation
def old_plot_pack(file,Data_dir,Output_dir, plot_similarity):
    file = file
    path = Data_dir+"/"+file
    data = pd.read_pickle(path)  

    if insta == "Xenith":
        selected_channels = data.columns[19:70]
    elif insta == "Aurora5L":
        selected_channels = data.columns[list(range(1,17))+list(range(19,35))+list(range(39,71))]

    data = data[selected_channels]
    data = data.reset_index(drop=True)
    #data["Id"] = range(len(data))
    #data = data.loc[0:100,:]
    cell_count = len(data)
    data = data.transpose()
    data.insert(0, "channel", pd.Series(selected_channels.tolist(),index=selected_channels))


    #plot hist
    title = file
    data_hist = data.loc[:,1:].transpose()
    num_bins = 100
    channel = selected_channels[18]

    plt.hist(data_hist[channel], num_bins, alpha=1)
    plt.title(title+"\n"+channel, loc='left', fontsize=12, fontweight=0)
    #plt.xticks(rotation = 90,fontsize = 10)
    plt.xlabel("Signal Intensity",fontsize = 10)
    plt.ylabel("Count",fontsize = 10)
    #plt.show()
    plt.savefig(Output_dir+"/hist_"+title+"_"+channel[0:11]+".pdf", format='pdf')  
    plt.close()


    #plot row lineplots
    columns = data.columns
    title = file
    # set figure size
    my_dpi=96
    plt.figure(figsize=(2500/my_dpi, 1500/my_dpi), dpi=my_dpi)

    for column in columns[1:]:
        plt.plot(data[column], label = str(column))
    plt.title(title, loc='left', fontsize=12, fontweight=0)
    plt.xticks(rotation = 90,fontsize = 10)
    plt.xlabel("Channels",fontsize = 10)
    plt.ylabel("Signal",fontsize = 10)
    #plt.show()

    plt.savefig(Output_dir+"/lineplot_"+title+".pdf", format='pdf')  
    plt.close()


    #plot normalized lineplots
    data_norm = copy.deepcopy(data)
    for column in columns[1:]:
        data_norm[column] = data_norm[column]/data_norm[column].max()

    columns = data_norm.columns
    title = file
    # set figure size
    my_dpi=96
    plt.figure(figsize=(2500/my_dpi, 1500/my_dpi), dpi=my_dpi)

    for column in columns[1:]:
        plt.plot(data_norm[column], label = str(column))
    plt.title(title, loc='left', fontsize=12, fontweight=0)
    plt.xticks(rotation = 90,fontsize = 10)
    plt.xlabel("Channels",fontsize = 10)
    plt.ylabel("Normalized Signal",fontsize = 10)
    #plt.show()

    plt.savefig(Output_dir+"/Normalized_lineplot_"+title+".pdf", format='pdf')  
    plt.close()


    # prepare for similarity matrix
    if plot_similarity:
        data_dev = data.loc[:,1:]
        for i_row in range(len(data_dev)):
            #print(i_row)
            data_dev.loc[selected_channels[i_row],:] = data_dev.loc[selected_channels[i_row],:]-np.median(data_dev.loc[selected_channels[i_row],:])

        #plot similarity matrix heatmap
        cos_sim = cosine_similarity(np.array(data_dev))
        df = pd.DataFrame(cos_sim,index=selected_channels,columns=selected_channels)
            
        #plt.figure(figsize=(fig_weith, fig_height))
        sns.set_theme(style="ticks",font_scale=0.5)
        p = sns.clustermap(df,cmap=sns.color_palette("blend:#ffffff,#365E32", as_cmap=True), annot=True, fmt=".2f",cbar_pos=[0.04, 0.04, 0.02, 0.1],figsize=(16, 16))
        p.ax_cbar.set_title('Cosine Similarity')
        p.ax_heatmap.set_xlabel('Channels')
        p.ax_heatmap.set_ylabel('Channels')
        p.ax_heatmap.set_title('Similarity matrix for signal deviation \n Signal-median(Signal) \n '+ title, weight="bold", fontsize=10) 
        p.ax_col_dendrogram.set_visible(False)
        plt.savefig(Output_dir+"/similarity_matrix"+title+".pdf", format='pdf')
        plt.close()


In [None]:
#this block will draw some plots to estimate the scc files, which is not necessary for results estimation
#plot old_plot_pack
for f in cleaned_names[0:62]: #62
    file_neg = f + "_neg.fcs.pkl"
    file_pos = f + "_pos.fcs.pkl"

    tmp_output_dir = Output_dir+"/"+f
    mkdir_silent(tmp_output_dir)

    old_plot_pack(file=file_pos,Data_dir=Data_dir,Output_dir=tmp_output_dir,plot_similarity = False)
    old_plot_pack(file=file_neg,Data_dir=Data_dir,Output_dir=tmp_output_dir,plot_similarity = False)