In [27]:
import pandas as pd
import numpy as np
import datetime
from filegenerators import *
from typing import Dict, List, Union, Optional
pd.options.display.float_format = '{:.2e}'.format
from pathlib import Path
import subprocess
from CovCor_calc import OptimaMechtest, OptimaOutput, OptimaSensitivity
import seaborn as sns
import os
import copy
from scipy.linalg import sqrtm, logm
import matplotlib.pyplot as plt

In [2]:
class Theoretical_Ranges:
    def __init__(self, range_file: str, sheet_name: str, input_names: list[str],
                 must_be_zero: list[str]) -> None:
        
        self.df_species_ics = pd.read_excel(range_file, sheet_name=sheet_name)
        self.df_species_ics['value'] = self.df_species_ics['value'].astype(float)
        self.must_be_zero = must_be_zero
        self.get_input_data(input_names)
        self.gen_lookuptable()

    def get_input_data(self, input_names: list[str]) -> None:
        self.inputs = {}
        for i in input_names:
            self.inputs[i] = 0.0
        self.inputs["REF"] = 1.0
        self.inputs["Insulin"] = 1e-10

        self.input_data = pd.DataFrame([
            {'species': species, 'minconc': value, 'value': value, 'maxconc': value}
            for species, value in self.inputs.items()])

    def gen_lookuptable(self) -> None:
        self.lut = pd.concat([self.df_species_ics, self.input_data], ignore_index=True)    # look-up-table
        self.lut['species'] = self.lut['species'].str.upper()
        print(f"LUT was created successfully. Its dimensions are: {self.lut.shape}")

    def makeBounds(self, old = True):
        bounds = dict()
        for index, row in self.df_species_ics.iterrows():
            if row.value < 0.1:
                lb = 1e-14
                ub = 1e-13
            elif old:
                lb = (row.value/2)*1e-12
                ub = (row.value*1.5)*1e-12
            else:
                lb = (row.minconc)*1e-12
                ub = (row.maxconc)*1e-12
            bounds[row.species] = [lb, ub]
        return bounds

In [None]:
class Basal:
    def __init__(self, rng: Theoretical_Ranges, sampling_type: str, wide = False) -> None:
        self.rng = rng
        self.wide = wide
        self.old = self.get_sampling_type(sampling_type)   # Should be 'old' or 'new'
        self.bounds = self.rng.makeBounds(self.old)
        self.get_observables()
        self.rel_sigmas = self.get_sigma_dict()
        self.get_dataPoints()

    def get_sampling_type(self, sampling_type: str) -> bool:
        if sampling_type == 'old':
            if self.wide == True:
                raise ValueError("If the sampling type is 'old', wide needs to be False.")
            else:
                self.sampling_type = 'old'
                return True
        elif sampling_type == 'new':
            if self.wide:
                self.sampling_type = 'wide_new'
            else:
                self.sampling_type = 'new'
            return False
        else:
            raise ValueError("The sampling type can either be 'old' or 'new'.")

    def __str__(self) -> str:
        kiir = f"Fields:\ndim(bounds): {len(self.bounds)}\n" + \
               f"dim(observables): {len(self.observables)}\n" + \
               f"dim(sigma_dict): {len(self.rel_sigmas)}\n" + \
               f"dim(dataPoints): {self.dataPoints.shape}"
        return kiir

    def get_observables(self) -> None:
        self.observables = []
        for index, row in self.rng.df_species_ics.iterrows():
            if row.value > 0:
                self.observables.append(row.species)

    def get_sigma_dict(self) -> dict[str, float]:
        rel_sigmas = dict()

        for index, row in self.rng.df_species_ics.iterrows():
            if row.species in self.rng.inputs.keys():
                # if row.species not in observables:
                continue
            if row.value in self.rng.must_be_zero and row.species not in self.rng.inputs.keys():
                rel_sigmas[row.species] = 5e-14
            elif row.species not in self.rng.inputs.keys() and row.species in self.observables:
                if self.wide and row.minconc != row.maxconc:
                    rel_sigmas[row.species] = ((row.maxconc-row.minconc)/8)*1e-12
                else:
                    rel_sigmas[row.species] = ((row.value*1.5-row.value/2)/8)*1e-12
        return rel_sigmas

    def get_dataPoints(self) -> None:
        columns = list(set(self.observables)-set(self.rng.inputs.keys()))
        columns.sort()
        columns.insert(0,'time')
        time = np.linspace(0,24,25)

        self.dataPoints = pd.DataFrame(columns=columns)
        self.dataPoints['time'] = time*60

        # Fill in the "theoretical" stationary conentrations
        for index, row in self.rng.df_species_ics.iterrows():
            if row.species in self.dataPoints.columns:
                if row.value == 0:
                    self.dataPoints.loc[:,row.species] = 1e-13
                else:
                    self.dataPoints.loc[:,row.species] = row.value*1e-12

In [4]:
class Genesis:  # As in it creates the xmls ~ the cells {= life ;)} with different initial conditions
    def __init__(self, basal: Basal, num_of_xmls: int,
                 output_directory: str = '/home/nvme/Opt/7_Krisztian/xml/') -> None:
        self.basal = basal
        self.num_xmls = num_of_xmls
        self.maxdigit = len(str(num_of_xmls))
        self.check_output_dir(output_directory)
        self.let_there_be_life()

    def check_output_dir(self, output_directory) -> None:
        if self.basal.old and not self.basal.wide:
            folder_name = f"Basal_{self.num_xmls}_old"
        elif not self.basal.old and not self.basal.wide:
            folder_name = f"Basal_{self.num_xmls}_new"
        else:
            folder_name = f"Basal_{self.num_xmls}_wide_new"

        self.output_dir = os.path.join(output_directory, folder_name)

        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
            print(f"No existing directory was found at {self.output_dir}. Creating one now...")

    def let_there_be_life(self) -> None:
        species = self.basal.rng.df_species_ics.species.to_list()
        inputs = self.basal.rng.inputs
        input_names = inputs.keys()
        only_vars = list(set(species)-set(input_names))

        for i in range(1, self.num_xmls+1):
            file_index = i
            generate_file(file_index, self.output_dir, only_vars, inputs,
                          self.basal.bounds, self.basal.dataPoints,
                          self.basal.rel_sigmas, self.maxdigit)

        print("job finished")

In [26]:
class Simulation:
    def __init__(self, gen: Genesis, xmls_in_one_opp: int, opp_output_dir: str,
                 opp_name_prefix: str, all_in_one=False, kiir=True) -> None:
        self.gen = gen
        self.all_in_one = all_in_one
        self.get_xml_vec(xmls_in_one_opp)
        self.opp(opp_output_dir, opp_name_prefix, kiir)

    def get_xml_vec(self, xmls_in_one_opp) -> None:
        if self.all_in_one:
          self.xmls = np.arange(1, self.gen.num_xmls + 1, 1)
        else:
          self.xmls = []
          for i in range(1, self.gen.num_xmls, xmls_in_one_opp):
            xml_cnt = np.arange(i, i+xmls_in_one_opp, 1) 
            self.xmls.append(xml_cnt)

    def opp(self, opp_output_dir, opp_name_prefix, kiir) -> None:
        self.opps = []
        self.indices = []
        if self.all_in_one:
          opp_content = self.generate_opp_content(self.gen.output_dir, name='stac', num_xmls=self.xmls)
          opp_filename = f"{opp_name_prefix}_BCRN_corr_{self.xmls[-1]}_{self.gen.basal.sampling_type}.opp"
          self.opps.append(opp_filename)
          self.indices.append(f"{self.xmls[-1]}")
          if kiir:
            with open(os.path.join(opp_output_dir, opp_filename), "w") as f:
              f.write(opp_content)
        else:
          for num in self.xmls:
              opp_content = self.generate_opp_content(self.gen.output_dir, name='stac', num_xmls=num)
              opp_filename = f"{opp_name_prefix}_BCRN_corr_{num[-1]}_{self.gen.basal.sampling_type}.opp"
              self.opps.append(opp_filename)
              self.indices.append(f"{num[-1]}")
              if kiir:
                with open(os.path.join(opp_output_dir, opp_filename), "w") as f:
                  f.write(opp_content)

    def generate_opp_content(self, xml_folder: str, num_xmls: Union[list[int], list[list[int]]], mech_file: str = "7_Krisztian/mech/BCRN6.inp", 
                            name: str = 'stac', yaml_file: str = "7_Krisztian/mech/BCRN6.yaml", time_limit: int = 50, thread_limit: int = 32,
                            settings_tag: str = "systems_biology", solver: str = "cantera", extension: str = ".xml") -> str:

      # Create MECHMOD section
      mechmod = f"""MECHMOD
      USE_NAME         BCRN6
      MECH_FILE        {mech_file}
      COMPILE_cantera  {yaml_file}
      END
      """

      # Create MECHTEST section
      mechtest = f"""MECHTEST
      MECHANISM  BCRN6
      TIME_LIMIT {time_limit}
      THREAD_LIMIT {thread_limit}
      SETTINGS_TAG {settings_tag}
      FALLBACK_TO_DEFAULT_SETTINGS

      SOLVER {solver}
      SAVE_STATES      CSV
      """

      # Add each XML file name
      for xml in num_xmls:
          padded_number = str(xml).zfill(self.gen.maxdigit)
          mechtest += f"      NAME {xml_folder}/{name}_{padded_number}.xml\n"

      mechtest += "END\n"

      return mechmod + "\n" + mechtest

    def sim_runner(self, log_location:str = ''):
      self.parent_path = parent_path = Path.cwd().parents[1]
        
      if log_location == '':
        for idx, opp_file in enumerate(self.opps):
            command = ["bin/Release/OptimaPP", f"7_Krisztian/1_mechtest/{opp_file}"]
            print(f"Running: {' '.join(command)}")
            subprocess.run(command, check=True, cwd=parent_path)
      else:
        for idx, opp_file in enumerate(self.opps):
          command = ["bin/Release/OptimaPP", f"7_Krisztian/1_mechtest/{opp_file}"]
          print(f"Running: {' '.join(command)}")
          if self.all_in_one:
             log_idx = self.xmls[-1]
          else:
             log_idx = self.xmls[idx][-1]
          with open(f"../logs/2025526/run_log_old{log_idx}.txt", "w") as log:
              subprocess.run(command, check=True, stdout=log, stderr=subprocess.STDOUT, cwd=parent_path)


In [28]:
class Natural_Selection:
    def __init__(self, sim: Simulation) -> None:
        self.sim = sim
        self.sim_data: dict[str, OptimaMechtest] = {}
        self.get_sim_data()
        self.survival_of_the_fittest()

    def get_sim_data(self) -> None:
        for idx, key in enumerate(self.sim.indices):
            self.sim_data[key] = OptimaMechtest(self.sim.opps[idx])

    def sigma_range(self, meas, sim, sigma):
        radius = (sim-meas)/sigma
        return radius
    
    def isit_init(self, row, wide=False):
        lut = self.sim.gen.basal.rng.lut
        rel_sigmas = self.sim.gen.basal.rel_sigmas
        for k, v in row.items():
            right_row = lut[lut['species'] == k]

            if right_row.empty or k not in rel_sigmas.keys():
                #print(f"Species '{species}' not found in lut â€” skipping.")
                continue    # ezekre: BEC1A, PI3K, PI3KA, SERCAA nincsen adat a ranges tablazatban

            meas = right_row['value'].iloc[0] * 1e-12

            if not wide:
                radius = self.sigma_range(meas=meas, sim=v*1e-12, sigma=rel_sigmas[k])
            else:
                radius = self.sigma_range(meas=meas, sim=v*1e-12, sigma=rel_sigmas[k])

            if radius >= 4:
                return False

        return True
    
    def survival_of_the_fittest(self) -> None:
        self.good_xmls = []
        for idx in self.sim.indices:
            for xml_name, row in self.sim_data[idx].df_followed34.iterrows():
                all_ok = self.isit_init(row)
                if all_ok:
                    self.good_xmls.append(xml_name)
        print(f"Found {len(self.good_xmls)} good xmls")

    def filtering(self) -> None:
        data = copy.deepcopy(self.sim_data)
        first = True
        self.filtered_basal = pd.DataFrame()
        self.filtered_followed = pd.DataFrame()
        for k, v in data.items():
            v.df_basal.index = v.df_basal.index.str[7:-9]
            v.df_basal = v.df_basal.sort_index()
            if first:
                self.filtered_basal = v.df_basal[[xml in self.good_xmls for xml in v.df_basal.index]]
                self.filtered_followed = v.df_followed34[[xml in self.good_xmls for xml in v.df_followed34.index]]
                first = False
            else:
                self.filtered_basal = pd.concat([self.filtered_basal, v.df_basal[[xml in self.good_xmls for xml in v.df_basal.index]]],
                                        ignore_index=False)
                self.filtered_followed = pd.concat([self.filtered_followed, v.df_followed34[[xml in self.good_xmls for xml in v.df_followed34.index]]],
                                            ignore_index=False)

    def get_cov_cor(self, corr_xmls, keys: list[str]) -> None:
        self.dict_b = {}
        self.dict_f = {}
        self.dict_b_corr = {}
        self.dict_f_corr = {}
        self.dict_b_cov = {}
        self.dict_f_cov = {}
        for idx, range in enumerate(corr_xmls):
            self.dict_f[f"{keys[idx]}"] = self.filtered_followed.iloc[range].copy()
            self.dict_b[f"{keys[idx]}"] = self.filtered_basal.iloc[range].copy()
            self.dict_b_corr[f"{keys[idx]}"] = self.filtered_basal.iloc[range].copy().corr()
            self.dict_f_corr[f"{keys[idx]}"] = self.filtered_followed.iloc[range].copy().corr()
            self.dict_f_cov[f"{keys[idx]}"] = self.filtered_followed.iloc[range].copy().cov()
            self.dict_b_cov[f"{keys[idx]}"] = self.filtered_basal.iloc[range].copy().cov()


In [5]:
gyumik = {'alma': 'finom', 'korte': 'nem rossz', 'meggy': 'nyami', 'eper': 'nyami', 'szilva': 'nyami'}
for i in gyumik.keys():
    print(i)

alma
korte
meggy
eper
szilva


In [25]:
if __name__ == '__main__':
    input_names = ['nS', 'RAP', 'TG', 'dS', 'CCH', 'REF', 'Insulin', 'TG_SERCA', 'mTOR_RAP', 'casp', 'IP3R', 'Baxa', 'tBid']
    must_be_zero = ['casp', 'Baxa', 'tBid', 'p53a', 'PUMA']
    
    rng = Theoretical_Ranges('input_files/reactions_ics_finalised.xlsx', 'icranges',
                               input_names, must_be_zero)

    basal = Basal(rng, 'old')

    gen = Genesis(basal, 50)

    sim = Simulation(gen, 10, '../1_mechtest', 'ezaz')

LUT was created successfully. Its dimensions are: (84, 4)
job finished


In [8]:
gen.num_xmls

50