# [GSE85217] Tracking outliers - Cook's Distance

In [1]:
!jupyter-lab enable widgetsnbextension

[35m[C 2024-11-05 17:37:57.034 ServerApp][m No such file or directory: /home/thomas/Documents/git/medulloblastoma_cavalli_kaggle/code/local_version/nb/enable


In [2]:
# lib
#import modin.pandas as pd
import pandas as pd
import numpy as np
import os
from collections import OrderedDict
import umap

# fig
import matplotlib.pyplot as plt
import seaborn as sns

# local lib
import sys
sys.path.insert(1,'/home/thomas/Documents/git/medulloblastoma_cavalli_kaggle/code/local_version/fun')

from parser import Data

2024-11-05 17:38:03.469260: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-11-05 17:38:03.550920: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-11-05 17:38:03.575827: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-05 17:38:03.698609: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
import statsmodels.formula.api as smf

In [4]:
from scipy.stats import f
from statsmodels.formula.api import glm
import statsmodels.api as sm
from concurrent.futures import ProcessPoolExecutor, as_completed

In [5]:
path_data='/home/thomas/Documents/git/medulloblastoma_cavalli_kaggle/data/in/'
path_exp_mat = path_data + 'GSE85217_M_exp_763_MB_SubtypeStudy_TaylorLab_parsed.txt'
path_meta = path_data + 'GSE85217_Cavalli_subgroups_information_parsed.csv'

data=Data()
data.add_exp_mat(path_exp_mat,index_col="genes_name")
data.add_meta(path_meta=path_meta,index_col="samples_name")

In [6]:
import numpy as np
import pandas as pd
from scipy.stats import f
from statsmodels.formula.api import glm
import statsmodels.api as sm
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import shared_memory
import psutil
import time
from threading import Thread
import math

In [7]:
import multiprocessing as mp

In [8]:
import threading
import psutil
import time

class ResourceMonitor:

    def __init__(self):
        self.stop_event = threading.Event()
        self.monitor_thread = threading.Thread(target=self.monitor_resources)
        self.monitor_thread.deamon = True

    def monitor_resources(self):
        while not self.stop_event.is_set():
            print(f"RAM Usage: {psutil.virtual_memory().percent}% | CPU Usage: {psutil.cpu_percent()}%")
            time.sleep(1)

    def start(self):
        # Start the deamon thread
        self.monitor_thread.start()

    def stop(self):
        # Signal the thread to stop and wait for it to finish
        self.stop_event.set()
        self.monitor_thread.join()

    
# Usage example:
monitor = ResourceMonitor()
monitor.start()

# Main processing code goes here
time.sleep(5)  # Simulate work

# Stop the monitor when processing is done
monitor.stop()

RAM Usage: 58.2% | CPU Usage: 12.7%
RAM Usage: 58.3% | CPU Usage: 15.2%
RAM Usage: 58.3% | CPU Usage: 9.9%
RAM Usage: 58.3% | CPU Usage: 12.1%
RAM Usage: 58.3% | CPU Usage: 12.1%


In [13]:


class CookDistanceAnalyzer:

    def __init__(self, n_cores=6, batch_size = 100):
        self.n_cores = n_cores
        self.batch_size = batch_size
        self.resource_monitor = ResourceMonitor()
        
    @staticmethod
    def _print_mp(text):
        print(text)
        sys.stdout.flush()
        
    @classmethod
    def worker_mp(cls,job_id:int,worker,**kwargs):
        cls._print_mp("Processing chunk: "+str(job))
        output = worker(**kwargs)
        cls._print_mp("Finished chunk: "+str(job))
        return output
        
    @staticmethod
    def trimfn(x: float) -> int:
        return 2 if x >= 23.5 else 1 if x >= 3.5 else 0

    @staticmethod
    def trimmed_mean(x: np.ndarray, trim: float = 0.1, **kwargs) -> np.ndarray:
    
        assert trim <= 0.5
    
        kwargs.setdefault('axis',0)
        
        axis = kwargs.get("axis")
        s = np.sort(x,**kwargs)
        n = x.shape[axis]
        ntrim = math.floor(n * trim)
        return np.take(s, np.arange(ntrim, n - ntrim), axis).mean(**kwargs)

    def chunkify_exp_mat(self,exp_mat,design_series):

            ns = design_series.value_counts()
            trimratio = (1 / 3, 1 / 4, 1 / 8)

            for var in design_series.unique():
                lvl = self.trimfn(ns[var])
                yield exp_mat[design_series == var, :], lvl

    def trimmed_design_variance(self, exp_mat: np.ndarray, design_series: pd.Series) -> np.ndarray:
        return np.array([self.process_variance(exp_mat,lvl) for exp_mat,lvl in self.chunkify_exp_mat(np.array(exp_mat),design_series)]).mean(0)
            
            
    def process_variance(self,exp_mat:np.ndarray,lvl:int,trim_ratio=(1 / 3, 1 / 4, 1 / 8),scale=[2.04, 1.86, 1.51]):
        var_means = self.trimmed_mean(x=exp_mat, trim=trim_ratio[lvl], axis = 0)
        sqerror_var = (exp_mat - var_means) ** 2
        variance_means = self.trimmed_mean(sqerror_var, trim=trim_ratio[lvl], axis=0)
        variance_means*=scale[lvl]
        return variance_means

    def robust_method_of_moment_disp(self,exp_mat:np.ndarray,design_series:pd.Series):

        v = self.trimmed_design_variance(np.array(exp_mat),design_series)

        m = exp_mat.mean(axis=0)
        alpha = (v - m) / m**2

        # cannot use the typical min_disp = 1e-8 here or else all counts in the same
        # group as the outlier count will get an extreme Cook's distance
        min_disp = 0.04
        np.maximum(alpha, min_disp, out=alpha)
        return alpha

    def calculate_cook_distance(self,exp_mat:pd.DataFrame, design_series:pd.Series):

        outliers = {}

        alphas = self.robust_method_of_moment_disp(np.array(exp_mat),design_series)
        
        genes = exp_mat.columns
        exp_mat[design_series.name] = design_series

        m = len(design_series)
        p = len(design_series.unique()) + 1
        f_cutoff = f.ppf(0.99, p, m - p)

        shm_exp_mat, shared_exp_mat = self.initialize_shared_memory(exp_mat)

        self.resource_monitor.start()

        n_batches = (len(genes) + self.batch_size - 1) // self.batch_size
        for i in range(n_batches):
            gene_batch = genes[i * self.batch_size: (i + 1) * self.batch_size]
            alpha_batch = alphas[i * self.batch_size: (i + 1) * self.batch_size]
            #print(gene_batch)

            with ProcessPoolExecutor(max_workers=self.n_cores) as executor:
                futures = [executor.submit(self.gene_cook_distance, gene, alpha, exp_mat, design_series, f_cutoff) for gene, alpha in zip(gene_batch, alpha_batch)]

                for future in as_completed(futures):
                    result = future.result()
                    if result:
                        gene, outliers_index, cooks_d = result
                        outliers[gene]=(outliers_index,cook_d)

        self.resource_monitor.stop()

        shm_exp_mat.close()
        shm_exp_mat.unlink()

        return outliers


    def gene_cook_distance(self, gene, alpha, exp_mat, design_series, f_cutoff):

        model = sm.glm(formula=f"{gene} ~C({design_series.name})", data=exp_mat[[gene, design_series.name]],family=sm.families.NegativeBinomial(alpha=1//alpha)).fit()
        cooks_d = model.get_influence().cooks_distance[0]
        outliers_index = np.where(cooks_d > f_cutoff)[0]
        
        return (gene, outliers_index, cooks_d) if len(outliers_index) > 0 else None
        
    @staticmethod   
    def initialize_shared_memory(exp_mat):

        shm_exp_mat = shared_memory.SharedMemory(create=True, size=exp_mat.values.nbytes)
        shared_exp_mat = np.ndarray(exp_mat.shape, dtype=exp_mat.values.dtype, buffer=shm_exp_mat.buf)
        np.copyto(shared_exp_mat, exp_mat.values)
        return shm_exp_mat, shared_exp_mat

    def start_resource_monitor(self):
        stop_event = threading.Event()
        monitor_thread = Thread(target=self.check_monitor_resources, args=stop_event)
        monitor_thread.deamon = True
        monitor_thread.start()

        return stop_event

    @staticmethod
    def check_monitor_resources(stop_event):
        while not stop_event.is_set() :
            print(f"RAM Usage: {psutil.virtual_memory().percent}% | CPU Usage: {psutil.cpu_percent()}%")
            time.sleep(1)

    

In [14]:
import copy
cookdistana=CookDistanceAnalyzer()

#for chunks in cookdistana.chunkify_exp_mat(exp_mat=np.array(data.exp_mat.T),design_series=data.meta['Subtype']):
#    print(chunks[0])
#    print(chunks[1])
counts = copy.copy(data.exp_mat.T)
des =  copy.copy(data.meta['Subtype'])
cookdistana.trimmed_design_variance(exp_mat = counts, design_series = des)
#cookdistana.calculate_cook_distance(exp_mat=data.exp_mat.T,design_series=data.meta['Subtype'])
cookdistana.calculate_cook_distance(exp_mat = counts, design_series = des)

RAM Usage: 75.6% | CPU Usage: 6.2%
RAM Usage: 75.6% | CPU Usage: 6.2%


TypeError: cannot pickle '_thread.lock' object

RAM Usage: 75.7% | CPU Usage: 18.4%
RAM Usage: 75.7% | CPU Usage: 1.1%
RAM Usage: 75.7% | CPU Usage: 2.1%
RAM Usage: 75.4% | CPU Usage: 6.4%
RAM Usage: 75.5% | CPU Usage: 11.9%
RAM Usage: 75.5% | CPU Usage: 9.3%
RAM Usage: 75.5% | CPU Usage: 10.7%
RAM Usage: 75.5% | CPU Usage: 5.1%
RAM Usage: 75.5% | CPU Usage: 2.8%
RAM Usage: 75.4% | CPU Usage: 2.1%
RAM Usage: 75.4% | CPU Usage: 3.1%
RAM Usage: 75.4% | CPU Usage: 1.2%
RAM Usage: 75.4% | CPU Usage: 2.1%
RAM Usage: 75.3% | CPU Usage: 1.4%
RAM Usage: 75.3% | CPU Usage: 7.5%
RAM Usage: 75.4% | CPU Usage: 5.8%
RAM Usage: 75.4% | CPU Usage: 7.2%
RAM Usage: 75.4% | CPU Usage: 3.1%
RAM Usage: 75.5% | CPU Usage: 5.9%
RAM Usage: 75.4% | CPU Usage: 6.3%
RAM Usage: 75.4% | CPU Usage: 5.8%
RAM Usage: 75.4% | CPU Usage: 10.1%


In [11]:
import threading
import psutil
import time

class ResourceMonitor:
    def __init__(self):
        # Event to control when to stop the monitoring thread
        self._stop_event = threading.Event()
        self.monitor_thread = threading.Thread(target=self.monitor_resources, daemon=True)

    def monitor_resources(self):
        while not self._stop_event.is_set():
            print(f"RAM Usage: {psutil.virtual_memory().percent}% | CPU Usage: {psutil.cpu_percent()}%")
            time.sleep(1)

    def start(self):
        # Start the daemon thread
        self.monitor_thread.start()

    def stop(self):
        # Signal the thread to stop and wait for it to finish
        self._stop_event.set()
        self.monitor_thread.join()  # Wait for the thread to terminate cleanly

# Usage example:
monitor = ResourceMonitor()
monitor.start()

# Main processing code goes here
time.sleep(5)  # Simulate work

# Stop the monitor when processing is done
monitor.stop()


RAM Usage: 70.1% | CPU Usage: 21.3%
RAM Usage: 69.9% | CPU Usage: 5.3%
RAM Usage: 69.6% | CPU Usage: 2.4%
RAM Usage: 69.4% | CPU Usage: 2.5%
RAM Usage: 69.1% | CPU Usage: 1.3%
