In [1]:
import json
from pprint import pprint

import diskcache as dc
import requests
import jsonlines 

from deepl import Translator
from pydantic import BaseModel
from tqdm import tqdm

from config import TLCPaths
from data_loading import load_tlc_samples, get_annotation_ids
from models import Match, SampleCollection, Annotation, ProdigySample, ProdigyNERLabel

In [2]:
sample_collection = SampleCollection(load_tlc_samples())
annotations = [ann for sample in load_tlc_samples() for ann in sample.annotations]

def get_annotation_by_id(id):
    for ann in annotations:
        if ann.id == id:
            return ann
    return None

with open('search_terms_single_and_ids.json', 'r') as fp:
    terms_and_ids = json.load(fp)

In [3]:
annotations_file_path = TLCPaths.project_data_path.joinpath("samples_to_annotate.json")
validations_file_path = TLCPaths.project_data_path.joinpath("samples_to_validate.json")
# load the two json files 
with open(annotations_file_path, "r") as f:
    annotation_samples = [x for x in json.load(f)]
with open(validations_file_path, "r") as f:
    validation_samples = [x for x in json.load(f)]

print(f"{len(annotation_samples)=}")
print(f"{len(validation_samples)=}")


len(annotation_samples)=326
len(validation_samples)=672


In [4]:
validation_annotations = []
validation_cuis = {}
for validation_sample in validation_samples:
    stem = validation_sample["stem"]
    ids = terms_and_ids[stem]
    annotation = get_annotation_by_id(ids[0])
    validation_annotations.append(annotation)
    validation_cuis[annotation.id] = validation_sample["cui"]

In [5]:
annotation_annotations = []
for annotation_sample in annotation_samples:
    stem = annotation_sample["stem"]
    ids = terms_and_ids[stem]
    annotation = get_annotation_by_id(ids[0])
    annotation_annotations.append(annotation)

In [6]:
# translate mentions and synonym to english
# use mention and synonym to find cui. if disagree take mention if tech term or take synonym is lay term
# query umls for german description. if not available take english one and translate back to german

In [7]:
class TranslatedTerm(BaseModel):
    name: str
    cui: str = ""


class TranslatedAnnotation(BaseModel):
    annotation: Annotation
    translated_mention: TranslatedTerm
    translated_synonym: TranslatedTerm


class CachedTranslator:
    def __init__(self, cache, api_key):
        self.cache = cache
        self.translator = Translator(api_key)

    def translate_text(self, text, source_lang, target_lang):
        if (text, source_lang, target_lang) in self.cache:
            return self.cache[(text, source_lang, target_lang)]
        else:
            res = self.translator.translate_text(text, source_lang=source_lang,
                                                 target_lang=target_lang)
            self.cache[(text, source_lang, target_lang)] = res
            return res


deepl_cache = dc.Cache("caches/deepl_cache")
deepl_key = "0c25ea6d-b79f-288a-541f-ae25709c6312:fx"
translator = CachedTranslator(deepl_cache, deepl_key)

In [8]:
res = translator.translate_text("Krankenhausmauer", source_lang="DE", target_lang="EN-US")
res.text

'Hospital Wall'

In [9]:
translated_annotations = []
for annotation in tqdm(annotation_annotations):
    mention = annotation.get_mention()
    translated_mention = translator.translate_text(mention.lower(), source_lang="DE",
                                                   target_lang="EN-US").text
    if len(annotation.synonyms) == 0:
        translated_synonym = ""
    else:
        synonym = annotation.synonyms[0]
        translated_synonym = translator.translate_text(synonym.lower(), source_lang="DE",
                                                       target_lang="EN-US").text
    translated_annotations.append(TranslatedAnnotation(annotation=annotation,
                                                       translated_mention=TranslatedTerm(
                                                           name=translated_mention),
                                                       translated_synonym=TranslatedTerm(
                                                           name=translated_synonym)))

100%|██████████| 326/326 [00:00<00:00, 4681.72it/s]


In [10]:
translated_annotations[-1]

TranslatedAnnotation(annotation=Annotation(tech_term=None, lay_term='Harnröhreninfekt', type=<TermType.LAY: 'LAY'>, span_start=927, span_end=943, synonyms=['Bakterielle Urethritis', 'Bakterielle Urethritis'], id=2742), translated_mention=TranslatedTerm(name='urethral infection', cui=''), translated_synonym=TranslatedTerm(name='bacterial urethritis', cui=''))

In [11]:
UMLS_KEY = "43f9234c-4977-45f6-a440-2dda1b43d919"

umls_cache = dc.Cache("caches/umls_cache")


@umls_cache.memoize()
def get_cui(name):
    query_url = f"https://uts-ws.nlm.nih.gov/rest/search/current?apiKey={UMLS_KEY}&string={name}&searchType=normalizedString"
    ### send a get request to query url and get response
    response = requests.get(query_url)
    if not "result" in response.json():
        print("name: ", name, response.json())
        return ""
    results = response.json()["result"]["results"]
    if results:
        return results[0]["ui"]
    else:
        return ""


cui = get_cui(name="Pyelonephritis")
print(cui)

C0034186


In [12]:
for translated_annotation in tqdm(translated_annotations):
    if translated_annotation.translated_mention.cui == "":
        translated_annotation.translated_mention.cui = get_cui(
            name=translated_annotation.translated_mention.name)
    if translated_annotation.translated_synonym.cui == "":
        translated_annotation.translated_synonym.cui = get_cui(
            name=translated_annotation.translated_synonym.name)


100%|██████████| 326/326 [00:00<00:00, 8091.84it/s]


In [13]:
annotations_with_match = []  # entry = (annotation, translated_name, cui)
annotation_without_match = []  # entry = (annotation)    
for annotation in tqdm(translated_annotations):
    if annotation.translated_synonym.cui and not annotation.translated_mention.cui:
        entry = (annotation.annotation, annotation.translated_synonym.name,
                 annotation.translated_synonym.cui)
    elif not annotation.translated_synonym.cui and annotation.translated_mention.cui:
        entry = (annotation.annotation, annotation.translated_mention.name,
                 annotation.translated_mention.cui)
    elif not annotation.translated_synonym.cui and not annotation.translated_mention.cui:
        annotation_without_match.append(annotation)
        continue
    else:
        if annotation.annotation.type == "TECH":
            entry = (annotation.annotation, annotation.translated_mention.name,
                     annotation.translated_mention.cui)
        else:
            entry = (annotation.annotation, annotation.translated_synonym.name,
                     annotation.translated_synonym.cui)
    annotations_with_match.append(entry)

100%|██████████| 326/326 [00:00<00:00, 501777.29it/s]


In [14]:
# found cuis for 203 annotations and not for 123 annotations

In [15]:
print(len(annotations_with_match))
print(annotations_with_match[:2])

203
[(Annotation(tech_term='Gallensteine', lay_term=None, type=<TermType.TECH: 'TECH'>, span_start=121, span_end=133, synonyms=['Cholelith', 'Cholelith'], id=6202), 'gallstones', 'C0008350'), (Annotation(tech_term=None, lay_term='Magen Darm Grippe', type=<TermType.LAY: 'LAY'>, span_start=9, span_end=26, synonyms=['Norovirus-Gastroenteritis', 'Norovirus-Gastroenteritis'], id=5687), 'norovirus gastroenteritis', 'C2242683')]


In [16]:
print(len(annotation_without_match))  # no cui for mention or synonym found
print(annotation_without_match[0])

123
annotation=Annotation(tech_term='Defiblirator', lay_term=None, type=<TermType.TECH: 'TECH'>, span_start=1163, span_end=1175, synonyms=['Schockgeber', 'Schockgeber'], id=811) translated_mention=TranslatedTerm(name='defiblirator', cui='') translated_synonym=TranslatedTerm(name='shock generator', cui='')


In [17]:
for annotation in annotation_without_match[:5]:
    print(annotation.annotation.get_mention(), annotation.translated_mention)
    if annotation.annotation.synonyms:
        print(annotation.annotation.synonyms[0], annotation.translated_synonym)
    print()

Defiblirator name='defiblirator' cui=''
Schockgeber name='shock generator' cui=''

stopfende name='stuffing' cui=''
obstipierend name='obstipating' cui=''

Teufelskreis name='vicious circle' cui=''
Circulus vitiosus name='circulus vitiosus' cui=''

schlimme Rückenschmerzen in der Höhe der Nieren name='bad back pain at the level of the kidneys' cui=''
Symptome des oberen Urogenitaltraktes name='symptoms of the upper urogenital tract' cui=''

Schmerzen an den Nieren und 3 fach erhöhte Entzündungswerte im Urin name='pain in the kidneys and 3 times increased inflammation values in the urine' cui=''
Infektion des oberen Urogenitaltraktes name='infection of the upper urogenital tract' cui=''



In [18]:
import csv

with open("to_annotate_strict.csv", "w") as f:
    writer = csv.writer(f)
    writer.writerow(["mention", "synonym", "cui"])
    for annotation in annotation_without_match:
        writer.writerow([annotation.annotation.get_mention(), annotation.annotation.synonyms, ""])

In [19]:
filtered_anns = []
for ann in annotation_without_match:
    if ann.annotation.get_mention() == "Teufelskreis":
        print(ann.annotation)
        continue
    filtered_anns.append(ann)

tech_term=None lay_term='Teufelskreis' type=<TermType.LAY: 'LAY'> span_start=753 span_end=765 synonyms=['Circulus vitiosus', 'Circulus vitiosus'] id=3898


In [20]:
# annotation_without_match = filtered_anns

In [21]:
# ## read in annotations from csv
# with open("to_annotate.copy.csv", "r") as f:
#     reader = csv.reader(f)
#     next(reader)
#     manual_annotations = []
#     for annotation, row in zip(annotation_without_match, reader):
#         mention = row[0]
#         synonym = row[1]
#         cui = row[2]
#         assert annotation.annotation.get_mention() == mention
#         manual_annotations.append((annotation, cui))
#         
# manual_annotations[0]

In [22]:
manual_annotations = []

In [23]:
all_annotations = []
for annotation in tqdm(annotations_with_match):
    all_annotations.append(dict(annotation=annotation[0], cui=annotation[2]))
for annotation in tqdm(manual_annotations):
    all_annotations.append(dict(annotation=annotation[0].annotation, cui=annotation[1]))
for annotation in tqdm(validation_annotations):
    all_annotations.append(
        dict(annotation=annotation, cui=validation_cuis[annotation.id][0]))

100%|██████████| 203/203 [00:00<00:00, 768451.00it/s]
0it [00:00, ?it/s]
100%|██████████| 672/672 [00:00<00:00, 485040.83it/s]


In [24]:
print(len(all_annotations)) # 123 missing from manual annotation

875


# add descriptions

In [25]:
umls_cache = dc.Cache("caches/umls_cache")

@umls_cache.memoize()
def get_cui_name(cui):
    query_url = f"https://uts-ws.nlm.nih.gov/rest/content/current/CUI/{cui}?apiKey={UMLS_KEY}"
    response = requests.get(query_url)
    if not "result" in response.json():
        print("returning none for ", cui)
        return None
    else:
        eng_name = response.json()["result"]["name"]
        name = translator.translate_text(text=eng_name, source_lang="EN", target_lang="DE").text
        return name
    
@umls_cache.memoize()
def get_description(cui):
    name = get_cui_name(cui=cui)
    
    ger_sabs = ["DMDICD10", "DMDUMD", "WHOGER", "ICPCGER", "LNC-DE-AT", "LNC-DE-DE", "MDRGER",
                "MSHGER"]
    eng_sabs = ["HPO", "MDR", "MSH", "SNOMEDCT_US"]

    query_url = f"https://uts-ws.nlm.nih.gov/rest/content/current/CUI/{cui}/definitions?apiKey={UMLS_KEY}"
    response = requests.get(query_url)
    if "result" in response.json():
        result = response.json()["result"]
        for res in result:
            if res["rootSource"] in ger_sabs:
                return f"{name}: " + res["value"]
        for res in result:
            if res["rootSource"] in eng_sabs:
                return f"{name}: " + translator.translate_text(text=res["value"], source_lang="EN",
                                                 target_lang="DE").text
        return f"{name}: " + translator.translate_text(text=response.json()["result"][0]["value"],
                                         source_lang=None,
                                         target_lang="DE").text
    else:
        return name


In [26]:
for annotation in tqdm(all_annotations):
    annotation["description"] = get_description(cui=annotation["cui"])

100%|██████████| 875/875 [00:00<00:00, 17959.81it/s]


In [27]:
for annotation in tqdm(all_annotations):
    if annotation["description"] is None:
        print(annotation)

100%|██████████| 875/875 [00:00<00:00, 1871502.29it/s]

{'annotation': Annotation(tech_term=None, lay_term='Langzeitzuckerwert', type=<TermType.LAY: 'LAY'>, span_start=1054, span_end=1072, synonyms=['HbA1c - Glykohämoglobin, Der Spiegel im Blut gibt Auskunft über die Blutzuckerwerte der letzten vier bis zwölf Wochen', 'HbA1c'], id=4619), 'cui': 'C0373638', 'description': None}
{'annotation': Annotation(tech_term=None, lay_term='Nierenschädlichkeit', type=<TermType.LAY: 'LAY'>, span_start=36, span_end=55, synonyms=['Nephrotoxizität', 'Nephrotoxizität'], id=1949), 'cui': 'C0599918', 'description': None}
{'annotation': Annotation(tech_term='Bandscheibenvorfall', lay_term=None, type=<TermType.TECH: 'TECH'>, span_start=107, span_end=126, synonyms=['Bandscheibenprolaps', 'Bandscheibenprolaps'], id=3931), 'cui': 'C0242362', 'description': None}





In [28]:
no_desc_ann = [ann for ann in all_annotations if ann["description"] is None]
print(len(no_desc_ann))
all_annotations_with_desc = [ann for ann in all_annotations if ann["description"] is not None]

3


In [29]:
# parse annotations to prodigy samples
from data_loading import annotations_cache


In [31]:
prodigy_samples = []
for ann_with_cui_and_desc in tqdm(all_annotations_with_desc):
    annotation = ann_with_cui_and_desc["annotation"]
    cui = ann_with_cui_and_desc["cui"]
    description = ann_with_cui_and_desc["description"]
    spans = [ProdigyNERLabel(start=annotation.span_start, end=annotation.span_end)]
    text = sample_collection.get_sample_by_annotation_id(annotation.id).text
    annotations_ids = get_annotation_ids(annotation.get_mention())
    prodigy_samples.append(
        ProdigySample(text=text, spans=spans, id=annotation.id, cui=cui, html=description,
                      annotation_ids=annotations_ids))
    

100%|██████████| 872/872 [05:32<00:00,  2.62it/s]


In [32]:
with jsonlines.open(TLCPaths.project_data_path.joinpath("prodigy_samples_strict.jsonl"), "w") as fp:
    for annotation in prodigy_samples:
        fp.write(annotation.dict())