In [1]:
from HpoDatabase import HpoDatabase
db_path = 'hpo_chroma_db'
obo_path = 'hp.obo'
hpo_db = HpoDatabase(db_path, obo_path)
# test query
query = "muscle weakness"
results = hpo_db.query_hpo(query)
hpo_id, hpo_name = hpo_db.parse_results(results)
print(hpo_id, hpo_name)

HP:0001324 Muscle weakness


In [2]:
# load all synonyms generated by GPT-4o-mini
synonym_folder = '/Users/cl3720/Desktop/hpo-parser-fine-tuned-gemini/hpo_synonyms/gpt-4o-mini-2024-07-18'
import os
import json
expanded_hpo_name_dict = {}
if os.path.exists(synonym_folder):
# load the synonyms from the folder
# each file in the folder is a json file with the following format {"HP:XXXX": ["synonym1", "synonym2", "synonym3"]}
    for file in os.listdir(synonym_folder):
        with open(os.path.join(synonym_folder, file), "r") as f:
            expanded_hpo_name_dict.update(json.load(f))
len(expanded_hpo_name_dict)
print(f"There are {len(expanded_hpo_name_dict)} HPO terms with synonyms")

There are 18330 HPO terms with synonyms


In [28]:
# query with synonyms and store the id.
import random
query_results = {}
i = 0
for hp_id in expanded_hpo_name_dict:
    i += 1
    if i % 1000 == 0:
        print(f"Processed {i} HPO terms")       
    
    # if i > 100:
    #     break   
    query_results[hp_id] = []
    for synonym in expanded_hpo_name_dict[hp_id]:
        # create a random indicator p (0,1) to decide whether to query the synonym
        p = random.random()
        if p > 1:
            # takes ~5 minutes to query 500 HPO terms
            # on average, it takes 0.6 seconds to query one HPO term
            # so it will take 0.6 * 18000 / 3600 = 3 hours to query all HPO terms
            continue
        results = hpo_db.query_hpo(synonym)
        hit_hpo_id, _ = hpo_db.parse_results(results)
        if hit_hpo_id:
            query_results[hp_id].append(hit_hpo_id)
        else:
            print(f"No HPO id found for {synonym}")
            query_results[hp_id].append(None)

No HPO id found for 3.0 LogMAR pinhole test


In [5]:
# write the query results to a json file
# with open('rag_evaluation_query_results.json', 'w') as f:
#     json.dump(query_results, f)
    
# load the query results from the json file
with open('rag_evaluation_query_results.json', 'r') as f:
    query_results = json.load(f)

In [6]:
import pandas as pd
# merge two dictionary by keys

# convert expanded_hpo_name_dict into pandas dataframe
expanded_hpo_name_df = pd.DataFrame(expanded_hpo_name_dict.items(), columns=['query_hpo_id', 'synonyms'])
# convert query_results into pandas dataframe
query_results_df = pd.DataFrame(query_results.items(), columns=['query_hpo_id', 'hit_hpo_ids'])
# merge the two dataframes
merged_df = pd.merge(expanded_hpo_name_df, query_results_df, on='query_hpo_id')
# expand the hit_hpo_ids into multiple rows
merged_df = merged_df.explode(['synonyms','hit_hpo_ids'])

In [7]:
# turn off DeprecationWarning
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
# overall accuracy query_hpo_id == hit_hpo_ids
overall_accuracy = (merged_df.query_hpo_id == merged_df.hit_hpo_ids).mean()
print(f"Overall accuracy: {overall_accuracy}")
# by query_hpo_id at least one hit_hpo_ids == query_hpo_id
for at_leat_i_hit in range(1, 6):
    at_least_x_hit_accuracy = merged_df.groupby('query_hpo_id').apply(lambda x: (x.query_hpo_id == x.hit_hpo_ids).value_counts().get(True, 0) >= at_leat_i_hit).mean()
    print(f"At least {at_leat_i_hit} hit accuracy: {at_least_x_hit_accuracy}")


Overall accuracy: 0.41828696126568465
At least 1 hit accuracy: 0.7346426623022367
At least 2 hit accuracy: 0.5433169667212221
At least 3 hit accuracy: 0.3880523731587561
At least 4 hit accuracy: 0.26677577741407527
At least 5 hit accuracy: 0.15864702673213313


In [8]:
# load the HPO ontology
from HpoFactory import HpoFactory
hpoF = HpoFactory()
hpo_tree = hpoF.build_hpo_tree()
hpo_ancestors = hpoF.get_hpo_ancestors(hpo_tree)
hpo_levels = hpoF.get_hpo_levels(hpo_tree)

In [9]:
hpo_ancestors_df = pd.DataFrame(hpo_ancestors.items(), columns=['hpo_id', 'ancestor'])
# add hp_id to the ancestor list in the ancestor column
hpo_ancestors_df['ancestor'] = hpo_ancestors_df.apply(lambda x: [x.hpo_id] + x.ancestor, axis=1)
# hpo_ancestors_df = hpo_ancestors_df.explode('ancestor')
# hpo_ancestors_df.rename(columns={'hpo_id': 'descendant'}, inplace=True)
# # group by ancestor and make a list of descendants
# hpo_ancestors_df = hpo_ancestors_df.groupby('ancestor').descendant.apply(list).reset_index()
hpo_ancestors_df.rename(columns={'hpo_id': 'hit_hpo_ids'}, inplace=True)

In [141]:
# accuracy counted by ancestors
merged_ancestor_df = hpo_ancestors_df.merge(merged_df, how='right')
# generate a column to indicate whether the hit_hpo_ids is in the ancestor list
merged_ancestor_df['query_is_ancestor'] = merged_ancestor_df.apply(
    lambda x: isinstance(x['ancestor'], (list, set)) and x['query_hpo_id'] in x['ancestor'],
    axis=1
)# To avoid TypeError: argument of type 'float' is not iterable
# accuracy by ancestors
ancestor_accuracy = merged_ancestor_df.query_is_ancestor.mean()
print(f"Accuracy by ancestors: {ancestor_accuracy}")
# by query_hpo_id at least one hit_hpo_ids == ancestor
for at_leat_i_hit in range(1, 6):
    at_least_x_hit_accuracy = merged_ancestor_df.groupby('query_hpo_id').apply(lambda x: x.query_is_ancestor.value_counts().get(True, 0) >= at_leat_i_hit).mean()
    print(f"At least {at_leat_i_hit} hit accuracy by ancestors: {at_least_x_hit_accuracy}")

Accuracy by ancestors: 0.524
At least 1 hit accuracy by ancestors: 0.88
At least 2 hit accuracy by ancestors: 0.69
At least 3 hit accuracy by ancestors: 0.52
At least 4 hit accuracy by ancestors: 0.34
At least 5 hit accuracy by ancestors: 0.19


In [None]:
# Compare with string based search
