In [1]:
from utility import request_json, SOURCE_DIR, GENERATED_DIR
import requests
import rdflib
from tqdm import tqdm

import pandas as pd
tqdm.pandas()

from dotenv import load_dotenv, dotenv_values
load_dotenv()

True

### Check from Scicrunch

In [2]:
from utility import get_existing_term, get_term_from_label

In [3]:
### Load nerve list
df = pd.read_csv(SOURCE_DIR / 'nervesWithVagus_annotations.csv')

### Checking based on Term ID

In [12]:
if (fma_df_file := GENERATED_DIR/'fma_df.csv').exists():
    fma_df = pd.read_csv(fma_df_file)
else:
    fma_df = df[df['Term ID'].astype(str).str.startswith('FMA')]

In [13]:
### Query the missing one from server
fma_df['available'] = fma_df.progress_apply(
    lambda row: get_existing_term(row['Term ID']) if pd.isna(row['available']) else row['available'],
    axis=1
)

# ### Query all to server
# fma_df['available'] = fma_df['Term ID'].progress_apply(get_existing_term)

100%|██████████| 786/786 [10:37<00:00,  1.23it/s]


In [14]:
fma_df.to_csv(GENERATED_DIR / 'fma_df.csv', index=False)

### Checking those without Term ID

In [15]:
null_df = df[df['Term ID'].isnull()]
null_df['available'] = null_df['Group name'].progress_apply(get_term_from_label)

100%|██████████| 134/134 [01:34<00:00,  1.42it/s]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  null_df['available'] = null_df['Group name'].progress_apply(get_term_from_label)


In [16]:
null_df[null_df['available'].str.len() > 0].shape

(60, 4)

In [17]:
null_df.to_csv(GENERATED_DIR / 'null_df.csv', index=False)

### Check from uberon, using hasDbXref

In [18]:
### Dowload uberon.owl
### You can also downoad manually and store it in 'data/source' directory

uberon_url = 'https://data.bioontology.org/ontologies/UBERON/submissions/351/download?apikey=8b5b7825-538d-40e0-9e9e-5ab9274a9aeb'
response = requests.get(uberon_url)
response.raise_for_status()
with open(SOURCE_DIR / 'uberon.owl', "wb") as f:
    f.write(response.content)

KeyboardInterrupt: 

In [19]:
## Load uberon to rdflib graph
g_uberon = rdflib.Graph()
g_uberon.parse(SOURCE_DIR / 'uberon.owl', format='xml')

<Graph identifier=N7538b30d60894ce38e88480896eb2808 (<class 'rdflib.graph.Graph'>)>

In [20]:
OBOINOWL = rdflib.Namespace("http://www.geneontology.org/formats/oboInOwl#")

def get_hasDbXref(term):
    for s in g_uberon.subjects(predicate=OBOINOWL.hasDbXref, object=rdflib.Literal(term)):
        if (s, rdflib.RDF.type, rdflib.OWL.Class) in g_uberon:
            return str(s).replace('http://purl.obolibrary.org/obo/UBERON_', 'UBERON:')
    return pd.NA

In [21]:
fma_df = fma_df.copy()
fma_df['available'] = fma_df['available'].fillna(fma_df['Term ID'].apply(get_hasDbXref))

### Now check superclass and superbranch
This is not the match but candidate to check

And this take a lot of time, be patient

In [14]:
### Dowload fma.owl
### You can also downoad manually and store it in 'data/source' directory

fma_url = 'https://data.bioontology.org/ontologies/FMA/submissions/29/download?apikey=8b5b7825-538d-40e0-9e9e-5ab9274a9aeb'
response = requests.get(fma_url)
response.raise_for_status()
with open(SOURCE_DIR / 'fma.owl', "wb") as f:
    f.write(response.content)



In [22]:
g_fma = rdflib.Graph()
g_fma.parse(SOURCE_DIR /'fma.owl', format='xml')

<Graph identifier=Ne8adeb1408ad4290b7cf9bb4500f6a21 (<class 'rdflib.graph.Graph'>)>

In [23]:
FMA = rdflib.Namespace("http://purl.org/sig/ont/fma/fma")
RDFS = rdflib.Namespace("http://www.w3.org/2000/01/rdf-schema#")

def curie(uri):
    if isinstance(uri, rdflib.URIRef):
        uri = str(uri)
        if uri.startswith("http://purl.org/sig/ont/fma/fma"):
            return "FMA:" + uri.rsplit("fma", 1)[-1]
        elif uri.startswith("http://purl.obolibrary.org/obo/UBERON_"):
            return uri.replace('http://purl.obolibrary.org/obo/UBERON_', 'UBERON:')
        return uri
    return uri

def clean_literal(val):
    if isinstance(val, rdflib.Literal):
        if val.datatype and val.datatype.endswith("integer"):
            return int(val)
        elif val.datatype and val.datatype.endswith("float"):
            return float(val)
        return str(val)
    return val

def get_related_terms(term):
    query = f"""
    PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
    PREFIX FMA: <http://purl.org/sig/ont/fma/fma>
    PREFIX owl: <http://www.w3.org/2002/07/owl#>

    SELECT ?superclass ?label ?rel (COUNT(?mid) AS ?level)
    WHERE {{
      {{
        # Subclass path
        {term} rdfs:subClassOf* ?mid .
        ?mid rdfs:subClassOf* ?superclass .
        BIND("subClassOf" AS ?rel)
      }}
      UNION
      {{
        # branch_of restriction path
        {term} rdfs:subClassOf* ?mid .
        ?mid rdfs:subClassOf* [
          owl:onProperty <http://purl.org/sig/ont/fma/branch> ;
          owl:someValuesFrom ?superclass
        ] .
        BIND("branch_of" AS ?rel)
      }}

      # Shared filters and info
      OPTIONAL {{ ?superclass rdfs:label ?label }}
      ?superclass a owl:Class .
      ?superclass rdfs:subClassOf* FMA:65132 .
    }}
    GROUP BY ?superclass ?label ?type ?rel
    ORDER BY ?level

    """

    rows = []
    for row in g_fma.query(query):
        superclass = curie(row.superclass)
        if superclass == 'FMA:65132':
            continue
        label = clean_literal(row.label)
        rel = clean_literal(row.rel)
        level = clean_literal(row.level)
        rows.append((superclass, label, rel, level))

    # return pd.DataFrame(rows, columns=["superclass", "label", "relation", "level"])
    return rows


In [24]:
### now get superclass or super branch of terms (WARNING: this will be slow)
candidate_terms = {}
for term in tqdm(fma_df[fma_df['available'].isna()]['Term ID']):
    candidate_terms[term] = get_related_terms(term)

100%|██████████| 432/432 [35:37<00:00,  4.95s/it]


In [25]:
### populate existing terms, so can be access several time
existing_terms = {}

# Flatten and deduplicate candidate terms
unique_rows = set(row for k_terms in candidate_terms.values() for row in k_terms)

for row in tqdm(unique_rows, desc="Checking existing terms"):
    if row[0] in existing_terms: continue
    ilx_terms = get_existing_term(row[0])
    if isinstance(ilx_terms, list) and len(ilx_terms) > 0:
        existing_terms[row[0]] = ilx_terms

Checking existing terms: 100%|██████████| 1211/1211 [29:32<00:00,  1.46s/it]


In [26]:
### then check from existing_terms
ct_df = pd.DataFrame(columns=['Term ID', 'ILX superclass', 'FMA superclass', 'superclass label', 'relation', 'level'])
missing_fmas = []
for term, k_terms in tqdm(candidate_terms.items()):
    for row in k_terms:
        new_row = []
        if (ilx_terms:=existing_terms.get(row[0])):
            new_row = {
                'Term ID': term,
                'ILX superclass': ilx_terms,
                'FMA superclass': row[0],
                'superclass label': row[1],
                'relation': row[2],
                'level': row[3]
            }
            ct_df = pd.concat([ct_df, pd.DataFrame([new_row])], ignore_index=True)
            break
        if not new_row:
            missing_fmas += [term]

100%|██████████| 429/429 [00:00<00:00, 9912.17it/s]


In [27]:
## Final step, for missing FMA terms check to UBERON

OBOINOWL = rdflib.Namespace("http://www.geneontology.org/formats/oboInOwl#")

for term in missing_fmas:
    for row in candidate_terms[term]:
        for s in g_uberon.subjects(predicate=OBOINOWL.hasDbXref, object=rdflib.Literal(curie)):
            if (s, rdflib.RDF.type, rdflib.OWL.Class) in g_uberon:
                new_row = {
                    'Term ID': term,
                    'ILX superclass': [curie[s]],
                    'FMA superclass': row[0],
                    'superclass label': row[1],
                    'relation': row[2],
                    'level': row[3]
                }
                ct_df = pd.concat([ct_df, pd.DataFrame([new_row])], ignore_index=True)
                break  # break inner 'row' loop
        else:
            continue  # only runs if no break: keep checking rows
        break

### Now combine all

In [28]:
df_merged = df.merge(fma_df[['Term ID', 'available']], on='Term ID', how='left')

df_merged = df_merged.merge(null_df[['Group name', 'available']].rename(columns={'available': 'group_available'}),
                            on='Group name', how='left')

df_merged['available'] = df_merged['available'].fillna(df_merged['group_available'])

df_merged.drop(columns=['group_available'], inplace=True)


final_df = df_merged.merge(
    ct_df,
    on='Term ID',
    how='left'  # or 'outer' if you want to preserve all terms from both
)

In [39]:
### assign Term ID in final_df if available
import numpy as np

def pick_term_id(avail_list):
    if not isinstance(avail_list, list):
        return avail_list
    for item in avail_list:
        if isinstance(item, str) and item.startswith('UBERON'):
            return item
    for item in avail_list:
        if isinstance(item, str) and item.startswith('ILX'):
            return item
    return avail_list[0]

# Only update where 'Term ID' is NaN
final_df['Term ID'] = final_df.apply(
    lambda row: pick_term_id(row['available']) if pd.isna(row['Term ID']) else row['Term ID'],
    axis=1
)

In [42]:
final_df = final_df.rename(columns={'Unnamed: 2': 'Note'})

In [43]:
final_df.drop_duplicates(subset=['Term ID', 'Group name']).to_csv(GENERATED_DIR / 'mapped_fma_nerves.csv')