diff --git a/src/gcms_data_analysis/__init__.py b/src/gcms_data_analysis/__init__.py index aea19f1..e69de29 100644 --- a/src/gcms_data_analysis/__init__.py +++ b/src/gcms_data_analysis/__init__.py @@ -1 +0,0 @@ -from .main import Project, name_to_properties diff --git a/src/gcms_data_analysis/gcms.py b/src/gcms_data_analysis/gcms.py index 5c881df..ce32dfa 100644 --- a/src/gcms_data_analysis/gcms.py +++ b/src/gcms_data_analysis/gcms.py @@ -1,7 +1,11 @@ from __future__ import annotations from typing import Literal import pathlib as plib +import numpy as np import pandas as pd +import pubchempy as pcp +import ele +from gcms_data_analysis.fragmenter import Fragmenter class Project: @@ -26,7 +30,7 @@ def __init__( ] = "retention_time", plot_font: Literal["Dejavu Sans", "Times New Roman"] = "Dejavu Sans", plot_grid: bool = False, - auto_save_reports: bool = True, + auto_save_to_excel: bool = True, columns_to_rename_and_keep_in_files: dict[str, str] | None = None, compounds_to_rename_in_files: dict[str, str] | None = None, param_to_axis_label: dict[str, str] | None = None, @@ -47,7 +51,7 @@ def __init__( self.column_to_sort_values_in_samples = column_to_sort_values_in_samples self.plot_font = plot_font self.plot_grid = plot_grid - self.auto_save_reports = auto_save_reports + self.auto_save_to_excel = auto_save_to_excel if columns_to_rename_and_keep_in_files is None: self.columns_to_rename_and_keep_in_files = { @@ -322,24 +326,260 @@ def create_list_of_all_compounds(self): print(f"Info: created {len(self.list_of_all_compounds) = }") return self.list_of_all_compounds - def load_compounds_properties(self): + def create_compounds_properties( + self, update_saved_files_info: bool = True + ) -> pd.DataFrame: + """Retrieves and organizes properties for underivatized compounds using pubchempy, + updating the 'compounds_properties' attribute and saving the properties + to 'compounds_properties.xlsx'.""" + print("Info: create_compounds_properties: started") + + if self.dict_classes_to_codes is None: + self.load_class_code_frac() + if not self.list_of_all_compounds is None: + self.create_list_of_all_compounds() + # cpdf = pd.DataFrame(index=pd.Index(self.list_of_all_compounds)) + # + cpdf = pd.DataFrame() + print("Info: create_compounds_properties: looping over names") + for name in self.list_of_all_compounds: + cpdf = name_to_properties( + comp_name=name, + dict_classes_to_codes=self.dict_classes_to_codes, + dict_classes_to_mass_fractions=self.dict_classes_to_mass_fractions, + df=cpdf, + ) + # cpdf = self._order_columns_in_compounds_properties(cpdf) + # cpdf = cpdf.fillna(0) + cpdf.index.name = "comp_name" + self.compounds_properties = cpdf + # save db in the project folder in the input + if update_saved_files_info: + cpdf.to_excel(plib.Path(self.folder_path, "compounds_properties.xlsx")) + print("Info: compounds_properties created") + return self.compounds_properties + + def load_compounds_properties(self) -> pd.DataFrame: """Attempts to load the 'compounds_properties.xlsx' file containing physical and chemical properties of compounds. If not found, it creates a new properties DataFrame and updates the 'compounds_properties_created' attribute.""" - compounds_properties_path = plib.Path( - self.folder_path, "compounds_properties.xlsx" - ) - if compounds_properties_path.exists(): - cpdf = pd.read_excel( - compounds_properties_path, - index_col="comp_name", - ) - # cpdf = _order_columns_in_compounds_properties(cpdf) - # cpdf = cpdf.fillna(0) + comps_prop_path = plib.Path(self.folder_path, "compounds_properties.xlsx") + if comps_prop_path.exists(): + cpdf = pd.read_excel(comps_prop_path, index_col="comp_name") self.compounds_properties = cpdf - self.compounds_properties_created = True print("Info: compounds_properties loaded") else: - print("Warning: compounds_properties.xlsx not found, creating it") + print("Warning: compounds_properties.xlsx not found") cpdf = self.create_compounds_properties() return self.compounds_properties + + +def get_compound_from_pubchempy(comp_name: str) -> pcp.Compound: + if not isinstance(comp_name, str) or comp_name.isspace(): + print(f"WARNING get_compound_from_pubchempy got an invalid {comp_name =}") + return None + cond = True + while cond: # to deal with HTML issues on server sides (timeouts) + try: + # comp contains all info about the chemical from pubchem + try: + comp_inside_list = pcp.get_compounds(comp_name, "name") + except ValueError: + print(f"{comp_name = }") + return None + if comp_inside_list: + comp = comp_inside_list[0] + else: + print( + f"WARNING: name_to_properties {comp_name=} does not find an entry in pcp", + ) + return None + cond = False + except pcp.PubChemHTTPError: # timeout error, simply try again + print("Caught: pcp.PubChemHTTPError (keep trying)") + return comp + + +def _order_columns_in_compounds_properties( + unsorted_df: pd.DataFrame | None, +) -> pd.DataFrame | None: + if unsorted_df is None: + return None + priority_cols: list[str] = [ + "iupac_name", + "underiv_comp_name", + "molecular_formula", + "canonical_smiles", + "molecular_weight", + "xlogp", + ] + + # Define a custom sort key function + def sort_key(col): + if col in priority_cols: + return (-1, priority_cols.index(col)) + if col.startswith("el_mf"): + return (2, col) + elif col.startswith("el_"): + return (1, col) + elif col.startswith("fg_mf_unclassified"): + return (5, col) + elif col.startswith("fg_mf"): + return (4, col) + elif col.startswith("fg_"): + return (3, col) + else: + return (0, col) + + # Sort columns using the custom key + sorted_columns = sorted(unsorted_df.columns, key=sort_key) + sorted_df = unsorted_df.reindex(sorted_columns, axis=1) + sorted_df.index.name = "comp_name" + # Reindex the DataFrame with the sorted columns + return sorted_df + + +def name_to_properties( + comp_name: str, + dict_classes_to_codes: dict[str:str], + dict_classes_to_mass_fractions: dict[str:float], + df: pd.DataFrame = pd.DataFrame(), + precision_sum_elements: float = 0.05, + precision_sum_functional_group: float = 0.05, +) -> pd.DataFrame: + """ + used to retrieve chemical properties of the compound indicated by the + comp_name and to store those properties in the df + + Parameters + ---------- + GCname : str + name from GC, used as a unique key. + search_name : str + name to be used to search on pubchem. + df : pd.DataFrame + that contains all searched compounds. + df_class_code_frac : pd.DataFrame + contains the list of functional group names, codes to be searched + and the weight fraction of each one to automatically calculate the + mass fraction of each compounds for each functional group. + Classes are given as smarts and are looked into the smiles of the comp. + + Returns + ------- + df : pd.DataFrame + updated dataframe with the searched compound. + CompNotFound : str + if GCname did not yield anything CompNotFound=GCname. + + """ + + if not isinstance(df, pd.DataFrame): + raise TypeError("The argument df must be a pd.DataFrame.") + + if not isinstance(comp_name, str) or comp_name.isspace(): + return _order_columns_in_compounds_properties(df) + + if comp_name in df.index.tolist(): + return _order_columns_in_compounds_properties(df) + + comp = get_compound_from_pubchempy(comp_name) + + if comp is None: + df.loc[comp_name, "iupac_name"] = "unidentified" + return _order_columns_in_compounds_properties(df) + + try: + valid_iupac_name = comp.iupac_name.lower() + except AttributeError: # iupac_name not give + valid_iupac_name = comp_name.lower() + + df.loc[comp_name, "iupac_name"] = valid_iupac_name + df.loc[comp_name, "molecular_formula"] = comp.molecular_formula + df.loc[comp_name, "canonical_smiles"] = comp.canonical_smiles + df.loc[comp_name, "molecular_weight"] = float(comp.molecular_weight) + + try: + df.loc[comp_name, "xlogp"] = float(comp.xlogp) + except ( + TypeError + ): # float() argument must be a string or a real number, not 'NoneType' + df.loc[comp_name, "xlogp"] = np.nan + elements = set(comp.to_dict()["elements"]) + el_dict = {} + el_mf_dict = {} + + for el in elements: + el_count = comp.to_dict()["elements"].count(el) + el_mass = ele.element_from_symbol(el).mass + + # Using similar logic as in the fg_dict example + if el not in el_dict: + el_dict[el] = 0 + el_mf_dict[el] = 0.0 + + el_dict[el] += int(el_count) + el_mf_dict[el] += ( + float(el_count) * float(el_mass) / float(comp.molecular_weight) + ) + # Now, update the DataFrame in a similar way to the fg_dict example + for key, value in el_dict.items(): + df.at[comp_name, f"el_{key}"] = int(value) + + for key, value in el_mf_dict.items(): + df.at[comp_name, f"el_mf_{key}"] = float(value) + cols_el_mf = [col for col in df.columns if col.startswith("el_mf_")] + residual_els = df.loc[comp_name, cols_el_mf].sum() - 1 + # check element sum + try: + assert residual_els <= precision_sum_elements + except AssertionError: + print(f"the total mass fraction of elements in {comp_name =} is > 0.001") + # apply fragmentation using the Fragmenter class (thanks simonmb) + frg = Fragmenter( + dict_classes_to_codes, + fragmentation_scheme_order=dict_classes_to_codes.keys(), + algorithm="simple", + ) + fragmentation, _, _ = frg.fragment(comp.canonical_smiles) + fg_dict = {} + fg_mf_dict = {} + # Iterate over each item in the dictionary + for key, value in fragmentation.items(): + # Determine the root key (the part before an underscore, if present) + root_key = key.split("_")[0] + # if root_key in hetero_atoms: + # pass + # Check if the root key is in the sum_dict; if not, initialize it + if root_key not in fg_dict: + fg_dict[root_key] = 0 + fg_mf_dict[root_key] = 0 + # Add the value to the corresponding root key in the sum_dict + fg_dict[root_key] += int(fragmentation[key]) + fg_mf_dict[root_key] += ( + float(fragmentation[key]) + * float(dict_classes_to_mass_fractions[key]) + / df.loc[comp_name, "molecular_weight"].astype(float) + ) # mass fraction of total + + # Update df with fg_dict + for key, value in fg_dict.items(): + df.at[comp_name, f"fg_{key}"] = int(value) # Update the cell + # Update df with fg_mf_dict + for key, value in fg_mf_dict.items(): + df.at[comp_name, f"fg_mf_{key}"] = float(value) # Update the cell + cols_fg_mf = [col for col in df.columns if col.startswith("fg_mf")] + residual_fgs = df.loc[comp_name, cols_fg_mf].sum() - 1 + try: + assert residual_fgs <= precision_sum_functional_group + except AssertionError: + print(f"{df.loc[comp_name, cols_fg_mf].sum()=}") + print( + f"the total mass fraction of functional groups in {comp_name =} is > 0.05" + ) + if residual_fgs < -precision_sum_functional_group: + df.at[comp_name, "fg_mf_unclassified"] = abs(residual_fgs) + df.loc[df["iupac_name"] != "unidentified"] = df.loc[ + df["iupac_name"] != "unidentified" + ].fillna(0) + return _order_columns_in_compounds_properties(df) diff --git a/tests/conftest.py b/tests/conftest.py index 62c24e6..2b7ce2b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,59 +2,18 @@ import pandas as pd import numpy as np import pytest -from gcms_data_analysis.main import Project - -test_dir: plib.Path = plib.Path(__file__).parent - - -# testing name_to_properties -name_to_properties_dir = test_dir / "data_name_to_properties" - - -@pytest.fixture -def dicts_classifications_codes_fractions(): - ccf = pd.read_excel( - plib.Path( - name_to_properties_dir, - "classifications_codes_fractions.xlsx", - ) - ) - dict_class_to_code: dict[str, str] = dict( - zip( - ccf.classes.tolist(), - ccf.codes.tolist(), - ) - ) - dict_class_to_mass_fraction: dict[str, float] = dict( - zip( - ccf.classes.tolist(), - ccf.mfs.tolist(), - ) - ) - return dict_class_to_code, dict_class_to_mass_fraction - - -@pytest.fixture -def checked_n2p_compounds_properties(): - properties = pd.read_excel( - plib.Path( - name_to_properties_dir, - "checked_compounds_properties.xlsx", - ), - index_col="comp_name", - ) - return properties +from gcms_data_analysis.gcms import Project # test minimal_case +test_dir: plib.Path = plib.Path(__file__).parent minimal_case_dir = test_dir / "data_minimal_case" @pytest.fixture def gcms() -> Project: - Project.set_folder_path(minimal_case_dir) - Project.set_auto_save_to_excel(False) - return Project() + proj = Project(folder_path=minimal_case_dir, auto_save_to_excel=False) + return proj @pytest.fixture @@ -63,7 +22,7 @@ def checked_files_info(): index=pd.Index(["S_1", "S_2", "T_1", "T_2", "T_3"], name="filename"), data={ "samplename": ["S", "S", "T", "T", "T"], - "replicate_number": [1, 2, 1, 2, 3], + "replicatenumber": [1, 2, 1, 2, 3], "derivatized": [False, False, False, False, False], "calibration_file": [ "cal_minimal", @@ -86,7 +45,7 @@ def checked_created_files_info(): index=pd.Index(["S_1", "S_2", "T_1", "T_2", "T_3"], name="filename"), data={ "samplename": ["S", "S", "T", "T", "T"], - "replicate_number": ["1", "2", "1", "2", "3"], + "replicatenumber": ["1", "2", "1", "2", "3"], "derivatized": [False, False, False, False, False], "calibration_file": [False, False, False, False, False], "dilution_factor": [1, 1, 1, 1, 1], @@ -241,7 +200,7 @@ def checked_samples_info(): index=pd.Index(['S', 'T'], name='samplename'), data={ 'filename': [('S_1', 'S_2'), ('T_1', 'T_2', 'T_3')], - 'replicate_number': [(1, 2), (1, 2, 3)], + 'replicatenumber': [(1, 2), (1, 2, 3)], 'derivatized': [(False, False), (False, False, False)], 'calibration_file': [('cal_minimal', 'cal_minimal'), ('cal_minimal', 'cal_minimal', 'cal_minimal')], 'dilution_factor': [(1, 1), (1, 1, 1)], @@ -273,7 +232,7 @@ def checked_samples_info_std(): index=pd.Index(['S', 'T'], name='samplename'), data={ 'filename': [('S_1', 'S_2'), ('T_1', 'T_2', 'T_3')], - 'replicate_number': [(1, 2), (1, 2, 3)], + 'replicatenumber': [(1, 2), (1, 2, 3)], 'derivatized': [(False, False), (False, False, False)], 'calibration_file': [('cal_minimal', 'cal_minimal'), ('cal_minimal', 'cal_minimal', 'cal_minimal')], 'dilution_factor': [(1, 1), (1, 1, 1)], diff --git a/tests/data_minimal_case/compounds_properties.xlsx b/tests/data_minimal_case/compounds_properties.xlsx index 89344f3..201f8ed 100644 Binary files a/tests/data_minimal_case/compounds_properties.xlsx and b/tests/data_minimal_case/compounds_properties.xlsx differ diff --git a/tests/test_name_to_properties.py b/tests/test_name_to_properties.py index 44c654f..1ba8239 100644 --- a/tests/test_name_to_properties.py +++ b/tests/test_name_to_properties.py @@ -1,20 +1,53 @@ +import pathlib as plib import pytest -from gcms_data_analysis import name_to_properties +from gcms_data_analysis.gcms import name_to_properties from pandas.testing import assert_frame_equal import pandas as pd import numpy as np +test_dir: plib.Path = plib.Path(__file__).parent + + +# testing name_to_properties +name_to_properties_dir = test_dir / "data_name_to_properties" + +ccf = pd.read_excel( + plib.Path( + name_to_properties_dir, + "classifications_codes_fractions.xlsx", + ) +) +checked_dict_class_to_code: dict[str, str] = dict( + zip( + ccf.classes.tolist(), + ccf.codes.tolist(), + ) +) +checked_dict_class_to_mass_fraction: dict[str, float] = dict( + zip( + ccf.classes.tolist(), + ccf.mfs.tolist(), + ) +) +checked_properties = pd.read_excel( + plib.Path( + name_to_properties_dir, + "checked_compounds_properties.xlsx", + ), + index_col="comp_name", +) + @pytest.mark.parametrize("compound_name", [" ", None, False, np.nan]) def test_name_to_properties_wrong_input_df_empty( - compound_name, dicts_classifications_codes_fractions + compound_name, # dict_class_to_code, dict_class_to_mass_fraction ): - dict_class_to_code, dict_class_to_mass_fraction = ( - dicts_classifications_codes_fractions - ) df = pd.DataFrame() to_check = name_to_properties( - compound_name, dict_class_to_code, dict_class_to_mass_fraction, df + compound_name, + checked_dict_class_to_code, + checked_dict_class_to_mass_fraction, + df, ) assert to_check.empty @@ -22,36 +55,29 @@ def test_name_to_properties_wrong_input_df_empty( @pytest.mark.parametrize("compound_name", [" ", None, False, np.nan]) def test_name_to_properties_wrong_input_df_not_empty( compound_name, - dicts_classifications_codes_fractions, - checked_n2p_compounds_properties, ): - dict_class_to_code, dict_class_to_mass_fraction = ( - dicts_classifications_codes_fractions - ) to_check = name_to_properties( compound_name, - dict_class_to_code, - dict_class_to_mass_fraction, - checked_n2p_compounds_properties, + checked_dict_class_to_code, + checked_dict_class_to_mass_fraction, + checked_properties, ) assert_frame_equal( to_check, - checked_n2p_compounds_properties, + checked_properties, check_exact=False, atol=1e-3, rtol=1e-3, ) -def test_name_to_properties_name_not_on_pubchem_df_empty( - dicts_classifications_codes_fractions, -): - dict_class_to_code, dict_class_to_mass_fraction = ( - dicts_classifications_codes_fractions - ) +def test_name_to_properties_name_not_on_pubchem_df_empty(): df = pd.DataFrame() to_check = name_to_properties( - "name_not_on_pcp", dict_class_to_code, dict_class_to_mass_fraction, df + "name_not_on_pcp", + checked_dict_class_to_code, + checked_dict_class_to_mass_fraction, + df, ) df.loc["name_not_on_pcp", "iupac_name"] = "unidentified" assert_frame_equal( @@ -63,25 +89,17 @@ def test_name_to_properties_name_not_on_pubchem_df_empty( ) -def test_name_to_properties_name_not_on_pubchem_df_not_empty( - dicts_classifications_codes_fractions, - checked_n2p_compounds_properties, -): - dict_class_to_code, dict_class_to_mass_fraction = ( - dicts_classifications_codes_fractions - ) +def test_name_to_properties_name_not_on_pubchem_df_not_empty(): to_check = name_to_properties( "name_not_on_pcp", - dict_class_to_code, - dict_class_to_mass_fraction, - checked_n2p_compounds_properties, - ) - checked_n2p_compounds_properties.loc["name_not_on_pcp", "iupac_name"] = ( - "unidentified" + checked_dict_class_to_code, + checked_dict_class_to_mass_fraction, + checked_properties, ) + checked_properties.loc["name_not_on_pcp", "iupac_name"] = "unidentified" assert_frame_equal( to_check, - checked_n2p_compounds_properties, + checked_properties, check_exact=False, atol=1e-5, rtol=1e-5, @@ -104,18 +122,14 @@ def test_name_to_properties_name_not_on_pubchem_df_not_empty( ], ) def test_name_to_properties_single_compounds( - compound, dicts_classifications_codes_fractions, checked_n2p_compounds_properties + compound, ): - dict_class_to_code, dict_class_to_mass_fraction = ( - dicts_classifications_codes_fractions - ) - to_check = name_to_properties( - compound, dict_class_to_code, dict_class_to_mass_fraction + compound, checked_dict_class_to_code, checked_dict_class_to_mass_fraction ) to_check = to_check.loc[[compound], :] to_check = to_check.loc[:, (to_check != 0).any(axis=0)] - checked = checked_n2p_compounds_properties.loc[[compound], :] + checked = checked_properties.loc[[compound], :] checked = checked.loc[:, (checked != 0).any(axis=0)] assert_frame_equal( to_check, @@ -126,13 +140,7 @@ def test_name_to_properties_single_compounds( ) -def test_name_to_properties_all_compounds( - dicts_classifications_codes_fractions, checked_n2p_compounds_properties -): - dict_class_to_code, dict_class_to_mass_fraction = ( - dicts_classifications_codes_fractions - ) - +def test_name_to_properties_all_compounds(): compounds = [ "2-methylcyclopent-2-en-1-one", # Comment: small ketone "hexadecanoic acid", # Comment: another compound @@ -153,9 +161,12 @@ def test_name_to_properties_all_compounds( to_check = pd.DataFrame() for compound in compounds: to_check = name_to_properties( - compound, dict_class_to_code, dict_class_to_mass_fraction, to_check + compound, + checked_dict_class_to_code, + checked_dict_class_to_mass_fraction, + to_check, ) - checked = checked_n2p_compounds_properties + checked = checked_properties assert_frame_equal( to_check, checked, diff --git a/tests/test_project_class.py b/tests/test_project_class.py index 4029cee..b36a254 100644 --- a/tests/test_project_class.py +++ b/tests/test_project_class.py @@ -11,8 +11,8 @@ # %% proj = Project( folder_path=folder_path, - auto_save_reports=False, - compounds_to_rename_in_files={"phenol": "renamed_phenol"}, + auto_save_to_excel=False, + # compounds_to_rename_in_files={"phenol": "catechol"}, ) # check a couple of defaults @@ -48,3 +48,10 @@ print(cal) # %% lac = proj.create_list_of_all_compounds() + +# %% +cpc = proj.create_compounds_properties(update_saved_files_info=False) +cpl = proj.load_compounds_properties() + +assert_frame_equal(cpc, cpl, check_exact=False, atol=1e-5, rtol=1e-5, check_dtype=False) +# %%