Skip to content

Commit

Permalink
cleared __init__, added more in gcms.py,
Browse files Browse the repository at this point in the history
moved initialization in single tests
added tests
  • Loading branch information
mpecchi committed May 5, 2024
1 parent 55cff2f commit 029c898
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 119 deletions.
1 change: 0 additions & 1 deletion src/gcms_data_analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .main import Project, name_to_properties
270 changes: 255 additions & 15 deletions src/gcms_data_analysis/gcms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from __future__ import annotations
from typing import Literal
import pathlib as plib
import numpy as np
import pandas as pd
import pubchempy as pcp
import ele
from gcms_data_analysis.fragmenter import Fragmenter


class Project:
Expand All @@ -26,7 +30,7 @@ def __init__(
] = "retention_time",
plot_font: Literal["Dejavu Sans", "Times New Roman"] = "Dejavu Sans",
plot_grid: bool = False,
auto_save_reports: bool = True,
auto_save_to_excel: bool = True,
columns_to_rename_and_keep_in_files: dict[str, str] | None = None,
compounds_to_rename_in_files: dict[str, str] | None = None,
param_to_axis_label: dict[str, str] | None = None,
Expand All @@ -47,7 +51,7 @@ def __init__(
self.column_to_sort_values_in_samples = column_to_sort_values_in_samples
self.plot_font = plot_font
self.plot_grid = plot_grid
self.auto_save_reports = auto_save_reports
self.auto_save_to_excel = auto_save_to_excel

if columns_to_rename_and_keep_in_files is None:
self.columns_to_rename_and_keep_in_files = {
Expand Down Expand Up @@ -322,24 +326,260 @@ def create_list_of_all_compounds(self):
print(f"Info: created {len(self.list_of_all_compounds) = }")
return self.list_of_all_compounds

def load_compounds_properties(self):
def create_compounds_properties(
self, update_saved_files_info: bool = True
) -> pd.DataFrame:
"""Retrieves and organizes properties for underivatized compounds using pubchempy,
updating the 'compounds_properties' attribute and saving the properties
to 'compounds_properties.xlsx'."""
print("Info: create_compounds_properties: started")

if self.dict_classes_to_codes is None:
self.load_class_code_frac()
if not self.list_of_all_compounds is None:
self.create_list_of_all_compounds()
# cpdf = pd.DataFrame(index=pd.Index(self.list_of_all_compounds))
#
cpdf = pd.DataFrame()
print("Info: create_compounds_properties: looping over names")
for name in self.list_of_all_compounds:
cpdf = name_to_properties(
comp_name=name,
dict_classes_to_codes=self.dict_classes_to_codes,
dict_classes_to_mass_fractions=self.dict_classes_to_mass_fractions,
df=cpdf,
)
# cpdf = self._order_columns_in_compounds_properties(cpdf)
# cpdf = cpdf.fillna(0)
cpdf.index.name = "comp_name"
self.compounds_properties = cpdf
# save db in the project folder in the input
if update_saved_files_info:
cpdf.to_excel(plib.Path(self.folder_path, "compounds_properties.xlsx"))
print("Info: compounds_properties created")
return self.compounds_properties

def load_compounds_properties(self) -> pd.DataFrame:
"""Attempts to load the 'compounds_properties.xlsx' file containing physical
and chemical properties of compounds. If not found, it creates a new properties
DataFrame and updates the 'compounds_properties_created' attribute."""
compounds_properties_path = plib.Path(
self.folder_path, "compounds_properties.xlsx"
)
if compounds_properties_path.exists():
cpdf = pd.read_excel(
compounds_properties_path,
index_col="comp_name",
)
# cpdf = _order_columns_in_compounds_properties(cpdf)
# cpdf = cpdf.fillna(0)
comps_prop_path = plib.Path(self.folder_path, "compounds_properties.xlsx")
if comps_prop_path.exists():
cpdf = pd.read_excel(comps_prop_path, index_col="comp_name")
self.compounds_properties = cpdf
self.compounds_properties_created = True
print("Info: compounds_properties loaded")
else:
print("Warning: compounds_properties.xlsx not found, creating it")
print("Warning: compounds_properties.xlsx not found")
cpdf = self.create_compounds_properties()
return self.compounds_properties


def get_compound_from_pubchempy(comp_name: str) -> pcp.Compound:
if not isinstance(comp_name, str) or comp_name.isspace():
print(f"WARNING get_compound_from_pubchempy got an invalid {comp_name =}")
return None
cond = True
while cond: # to deal with HTML issues on server sides (timeouts)
try:
# comp contains all info about the chemical from pubchem
try:
comp_inside_list = pcp.get_compounds(comp_name, "name")
except ValueError:
print(f"{comp_name = }")
return None
if comp_inside_list:
comp = comp_inside_list[0]
else:
print(
f"WARNING: name_to_properties {comp_name=} does not find an entry in pcp",
)
return None
cond = False
except pcp.PubChemHTTPError: # timeout error, simply try again
print("Caught: pcp.PubChemHTTPError (keep trying)")
return comp


def _order_columns_in_compounds_properties(
unsorted_df: pd.DataFrame | None,
) -> pd.DataFrame | None:
if unsorted_df is None:
return None
priority_cols: list[str] = [
"iupac_name",
"underiv_comp_name",
"molecular_formula",
"canonical_smiles",
"molecular_weight",
"xlogp",
]

# Define a custom sort key function
def sort_key(col):
if col in priority_cols:
return (-1, priority_cols.index(col))
if col.startswith("el_mf"):
return (2, col)
elif col.startswith("el_"):
return (1, col)
elif col.startswith("fg_mf_unclassified"):
return (5, col)
elif col.startswith("fg_mf"):
return (4, col)
elif col.startswith("fg_"):
return (3, col)
else:
return (0, col)

# Sort columns using the custom key
sorted_columns = sorted(unsorted_df.columns, key=sort_key)
sorted_df = unsorted_df.reindex(sorted_columns, axis=1)
sorted_df.index.name = "comp_name"
# Reindex the DataFrame with the sorted columns
return sorted_df


def name_to_properties(
comp_name: str,
dict_classes_to_codes: dict[str:str],
dict_classes_to_mass_fractions: dict[str:float],
df: pd.DataFrame = pd.DataFrame(),
precision_sum_elements: float = 0.05,
precision_sum_functional_group: float = 0.05,
) -> pd.DataFrame:
"""
used to retrieve chemical properties of the compound indicated by the
comp_name and to store those properties in the df
Parameters
----------
GCname : str
name from GC, used as a unique key.
search_name : str
name to be used to search on pubchem.
df : pd.DataFrame
that contains all searched compounds.
df_class_code_frac : pd.DataFrame
contains the list of functional group names, codes to be searched
and the weight fraction of each one to automatically calculate the
mass fraction of each compounds for each functional group.
Classes are given as smarts and are looked into the smiles of the comp.
Returns
-------
df : pd.DataFrame
updated dataframe with the searched compound.
CompNotFound : str
if GCname did not yield anything CompNotFound=GCname.
"""

if not isinstance(df, pd.DataFrame):
raise TypeError("The argument df must be a pd.DataFrame.")

if not isinstance(comp_name, str) or comp_name.isspace():
return _order_columns_in_compounds_properties(df)

if comp_name in df.index.tolist():
return _order_columns_in_compounds_properties(df)

comp = get_compound_from_pubchempy(comp_name)

if comp is None:
df.loc[comp_name, "iupac_name"] = "unidentified"
return _order_columns_in_compounds_properties(df)

try:
valid_iupac_name = comp.iupac_name.lower()
except AttributeError: # iupac_name not give
valid_iupac_name = comp_name.lower()

df.loc[comp_name, "iupac_name"] = valid_iupac_name
df.loc[comp_name, "molecular_formula"] = comp.molecular_formula
df.loc[comp_name, "canonical_smiles"] = comp.canonical_smiles
df.loc[comp_name, "molecular_weight"] = float(comp.molecular_weight)

try:
df.loc[comp_name, "xlogp"] = float(comp.xlogp)
except (
TypeError
): # float() argument must be a string or a real number, not 'NoneType'
df.loc[comp_name, "xlogp"] = np.nan
elements = set(comp.to_dict()["elements"])
el_dict = {}
el_mf_dict = {}

for el in elements:
el_count = comp.to_dict()["elements"].count(el)
el_mass = ele.element_from_symbol(el).mass

# Using similar logic as in the fg_dict example
if el not in el_dict:
el_dict[el] = 0
el_mf_dict[el] = 0.0

el_dict[el] += int(el_count)
el_mf_dict[el] += (
float(el_count) * float(el_mass) / float(comp.molecular_weight)
)
# Now, update the DataFrame in a similar way to the fg_dict example
for key, value in el_dict.items():
df.at[comp_name, f"el_{key}"] = int(value)

for key, value in el_mf_dict.items():
df.at[comp_name, f"el_mf_{key}"] = float(value)
cols_el_mf = [col for col in df.columns if col.startswith("el_mf_")]
residual_els = df.loc[comp_name, cols_el_mf].sum() - 1
# check element sum
try:
assert residual_els <= precision_sum_elements
except AssertionError:
print(f"the total mass fraction of elements in {comp_name =} is > 0.001")
# apply fragmentation using the Fragmenter class (thanks simonmb)
frg = Fragmenter(
dict_classes_to_codes,
fragmentation_scheme_order=dict_classes_to_codes.keys(),
algorithm="simple",
)
fragmentation, _, _ = frg.fragment(comp.canonical_smiles)
fg_dict = {}
fg_mf_dict = {}
# Iterate over each item in the dictionary
for key, value in fragmentation.items():
# Determine the root key (the part before an underscore, if present)
root_key = key.split("_")[0]
# if root_key in hetero_atoms:
# pass
# Check if the root key is in the sum_dict; if not, initialize it
if root_key not in fg_dict:
fg_dict[root_key] = 0
fg_mf_dict[root_key] = 0
# Add the value to the corresponding root key in the sum_dict
fg_dict[root_key] += int(fragmentation[key])
fg_mf_dict[root_key] += (
float(fragmentation[key])
* float(dict_classes_to_mass_fractions[key])
/ df.loc[comp_name, "molecular_weight"].astype(float)
) # mass fraction of total

# Update df with fg_dict
for key, value in fg_dict.items():
df.at[comp_name, f"fg_{key}"] = int(value) # Update the cell
# Update df with fg_mf_dict
for key, value in fg_mf_dict.items():
df.at[comp_name, f"fg_mf_{key}"] = float(value) # Update the cell
cols_fg_mf = [col for col in df.columns if col.startswith("fg_mf")]
residual_fgs = df.loc[comp_name, cols_fg_mf].sum() - 1
try:
assert residual_fgs <= precision_sum_functional_group
except AssertionError:
print(f"{df.loc[comp_name, cols_fg_mf].sum()=}")
print(
f"the total mass fraction of functional groups in {comp_name =} is > 0.05"
)
if residual_fgs < -precision_sum_functional_group:
df.at[comp_name, "fg_mf_unclassified"] = abs(residual_fgs)
df.loc[df["iupac_name"] != "unidentified"] = df.loc[
df["iupac_name"] != "unidentified"
].fillna(0)
return _order_columns_in_compounds_properties(df)
Loading

0 comments on commit 029c898

Please sign in to comment.