# core

> Contains functions to read and parse information from the Chia dataset

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
#| export
import pandas as pd
import numpy as np

from pathlib import Path
from typing import List, Tuple, Callable

In [None]:
#| hide
!cat data/annotation.conf

[entities]
!CONCEPTS
	Scope
	Person
	Condition
	Drug
	Observation
	Measurement
	Procedure
	Device
	Visit
!ANNOTATION
	Negation
	Qualifier
	Temporal
	Value
	Multiplier
	Reference_point
	Line
	Mood
!ERROR
	Non-query-able
	Post-eligibility
	Pregnancy_considerations
	Competing_trial
	Informed_consent
	Intoxication_considerations
	Non-representable

[events]

[relations]
h-OR   Arg1:<ENTITY>, Arg2:<ENTITY>, <REL-TYPE>:symmetric-transitive
v-AND Arg1:<ANY>, Arg2:<ANY>
v-OR Arg1:<ANY>, Arg2:<ANY>
multi Arg1:<ANY>, Arg2:<ANY>
<OVERLAP>	Arg1:<ANY>, Arg2:<ANY>, <OVL-TYPE>:<ANY>

[attributes]
Optional   Arg:<ANY>
 

In [None]:
#| export
def load_eligibility_criteria() -> pd.DataFrame:
    
    _lst = []
    
    ent_map = {
        "drugs": "Drug", 
        "persons": "Person", 
        "procedures": "Proceure", 
        "conditions": "Condition",
        "devices": "Device",
        "visits": "Visit",
        "scopes": "Scope",
        "observations": "Observation",
        "measurements": "Measurement",
    }
    
    for mode in ["_inc", "_exc"]:
        
        criteria_files = Path("data").glob(f"*{mode}.txt")

        for f in criteria_files:
            clinical_trial_no = str(f).lstrip("data/").rstrip(f"{mode}.txt")

            with open(f, "rt") as f:
                criteria = " ".join(f.read().splitlines())
                
            _rec = {}

            _rec["ct_no"] = clinical_trial_no
            _rec["criteria"] = criteria
            _rec["mode"] = "inclusion" if mode == "_inc" else "exclusion"

            for e in ent_map:
                ents = extract_entities(clinical_trial_no, mode, ent_map[e])
                _rec[e] = ents if ents else None

            _lst.append(_rec)
        
    return pd.DataFrame(_lst)

In [None]:
#| export
def extract_entities(ct: str, mode: str, e: str) -> List:
    entities = []
    
    with open(f"data/{ct}{mode}.ann", "rt") as f:
        data = f.read().splitlines()
        
    for row in data:
        if e in row:
            entities.append(" ".join(row.split()[4:]))

    return entities

In [None]:
#| hide
df = load_eligibility_criteria()
nrow, ncol = df.shape

test_eq(nrow, 2000)
test_eq(ncol, 12)

In [None]:
#| export

from typing import Set

def jaccard_score(a: Set, b: Set, mode: str="strict") -> float:
    """Computes different versions of the Jaccard score depending on the requested mode
    
    strict: |a & b| / |a + b|
    
    relaxed: |a & b| / min{|a|,|b|}
    
    left: |a & b| / |a|
    
    right: |a & b| / |b|
    """
    
    if (not a) or (not b):
        return 0.
    
    if mode == "strict":
        return len(a.intersection(b)) / len(a.union(b))
    elif mode == "relaxed":
        return len(a.intersection(b)) / min(len(a), len(b))
    elif mode == "left":
        return len(a.intersection(b)) / len(a)
    elif mode == "right":
        return len(a.intersection(b)) / len(b)

In [None]:
from fastcore.test import *

left = set(["ala", "ma", "kota"])
right = set(["ola", "ma", "psa", "i", "papugę"])

test_eq(jaccard_score(left, right, mode="strict"), 1/7)
test_eq(jaccard_score(left, right, mode="relaxed"), 1/3)
test_eq(jaccard_score(left, right, mode="left"), 1/3)
test_eq(jaccard_score(left, right, mode="right"), 1/5)

In [None]:
#| export

from typing import List, Tuple

def entity_coverage(
    ents_true: List[str], 
    ents_pred: List[str], 
    mode: str, 
    threshold: float=0.) -> Tuple[float, float]:
    """Compute the compound metric of entity coverage in eligibility criteria
    
    Args:
    
    
        ents_true: entities from Chia annotations
    
    
        ents_pred: predicted entities
    
    
        mode: which version of Jaccard coefficient to use
        
        
        threshold: only matches with Jaccard coefficient above the threshold will count as non-zero
    
    
    For each entity in a criterion, find the predicted entity which maximizes the Jaccard score and
    return the average Jaccard score for matched entities and the percentage of entites for which
    any matching has been found
    """
    
    if not ents_true:
        ents_true = [] # make sure that ents_true is iterable
    
    if not ents_pred:
        return (0., 0.) # max() cannot operate on empty sequence

    scores = [
        max([jaccard_score(set(e_true.split()), set(e_pred.split()), mode=mode) for e_pred in ents_pred])
        for e_true
        in ents_true
    ]
    non_zero_scores = [s for s in scores if s > threshold]
    
    if not non_zero_scores:
        return (0., 0.)
        
    return (
        sum(non_zero_scores) / len(non_zero_scores), # average Jaccard score of matched entities
        len(non_zero_scores) / len(scores), # percentage of matched entities
    )
        

In [None]:
from fastcore.test import *

# test basic usage of entity_coverage function
ents_true = ['adult', 'no alcohol substance abuse', 'cardiovascular disease', 'elevated cholesterol']
ents_pred = ['adult man or woman', 'no alcohol usage during last year', 'high blood pressure']

test_eq(entity_coverage(ents_true, ents_pred, mode="strict"), (0.25, 0.5))
test_eq(entity_coverage(ents_true, ents_pred, mode="relaxed"), (0.75, 0.5))

In [None]:
# test usage of entity_coverage when there are no true entities
ents_true = []
ents_pred = ['adult man or woman', 'no alcohol usage during last year', 'high blood pressure']

test_eq(entity_coverage(ents_true, ents_pred, mode="strict"), (0., 0.))

ents_true = None
ents_pred = ['adult man or woman', 'no alcohol usage during last year', 'high blood pressure']

test_eq(entity_coverage(ents_true, ents_pred, mode="strict"), (0., 0.))

In [None]:
# test usage of entity_coverage when there are no true entities
ents_true = ['adult', 'no alcohol substance abuse', 'cardiovascular disease', 'elevated cholesterol']
ents_pred = []

test_eq(entity_coverage(ents_true, ents_pred, mode="strict"), (0., 0.))

ents_true = ['adult', 'no alcohol substance abuse', 'cardiovascular disease', 'elevated cholesterol']
ents_pred = None

test_eq(entity_coverage(ents_true, ents_pred, mode="strict"), (0., 0.))

In [None]:
# | export


def get_criteria_with_entities(
    entity: str, n: int = None, random: bool = False
) -> List[Tuple[str, str]]:
    df = load_eligibility_criteria()

    if random:
        result = (
            df[~df[entity].isna()][["criteria", entity]][:n]
            .sample(frac=1.0)
            .to_records(index=False)
            .tolist()
        )
    else:
        result = (
            df[~df[entity].isna()][["criteria", entity]][:n]
            .to_records(index=False)
            .tolist()
        )

    return result

In [None]:
test_eq(len(get_criteria_with_entities("drugs", n=100)), 100)

In [None]:
def biogpt_prompt_ner(_text: str, entity: str) -> List[str]:
    try:
        src_tokens = m.encode(_text)
        original_len = len(m.decode(src_tokens))
        generate = m.generate([src_tokens])[0]
        output = m.decode(generate[0]["tokens"])[original_len:]

        lst_output = (
            output.translate(str.maketrans("", "", string.punctuation))
            .replace(f"{entity}", "")
            .split()
        )

        return list(set(lst_output))
    except:
        return []


def extract_ners(x: str) -> List[str]:
    return (
        x.replace("[", "")
        .replace("]", "")
        .replace("'", "")
        .replace(", ", ",")
        .split(",")
    )

In [None]:
def prompt_predict(
    examples: List[Tuple[str, str]],
    entity: str,
    model: object,
    prompt_fun: Callable,
    n_shots: int = 1,
):
    modes = ["strict", "relaxed", "left", "right"]

    results = []

    predictions = [
        " ".join(biogpt_prompt_ner(prompt_fun(criterion, examples, n_shots), entity))
        for criterion, entities in criteria
    ]

#     for mode in modes:
#         (
#             df_entity[f"{entity}_{mode}_avg_score"],
#             df_entity[f"{entity}_{mode}_pct_score"],
#         ) = zip(
#             *df_entity.apply(
#                 lambda x: entity_coverage(
#                     extract_ners(x[f"{entity}"]),
#                     extract_ners(x[f"biogpt_{entity}"]),
#                     mode=mode,
#                 ),
#                 axis=1,
#             )
#         )

#         results.append(
#             (
#                 entity,
#                 mode,
#                 df_entity[f"{entity}_{mode}_avg_score"].mean(),
#                 df_entity[f"{entity}_{mode}_avg_score"].std(),
#                 df_entity[f"{entity}_{mode}_pct_score"].mean(),
#                 df_entity[f"{entity}_{mode}_pct_score"].std(),
#             )
#         )

    return predictions

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()