## Prepare Environment

Import the required libraries and set the Apache Beam pipeline.

In [15]:
import string
import typing

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

pipeline = beam.Pipeline(InteractiveRunner())

## Pipeline Configuration

Get the pipeline options from the notebook arguments and set the default
values.

In [16]:
clinical_trials_dataset_path = (
    input("Clinical trials dataset path:")
    or "../data/python_and_data_engineering/clinical_trials.csv"
)
drugs_dataset_path = (
    input("Drugs dataset path:") or "../data/python_and_data_engineering/drugs.csv"
)
pubmed_dataset_path = (
    input("Pubmed dataset path:") or "../data/python_and_data_engineering/pubmed.csv"
)
pubmed_json_dataset_path = (
    input("Pubmed JSON dataset path:")
    or "../data/python_and_data_engineering/pubmed.json"
)
drug_mention_output_path = (
    input("Mention output dataset path:") or "../output/drug-mentions"
)

## Data Loading

Load the data from the CSV and JSON files provided by the options ahead.

In [17]:
class ClinicalTrial(typing.NamedTuple):
    id: str
    title: str
    date: str
    journal: str


clinical_trials = pipeline | beam.io.ReadFromCsv(
    clinical_trials_dataset_path,
    header=0,
    names=["id", "title", "date", "journal"],
    dtype={
        "id": str,
        "title": str,
        "date": str,
        "journal": str,
    },
    skip_blank_lines=True,
    parse_dates=["date"],
    infer_datetime_format=True,
    dayfirst=True,
    cache_dates=True,
).with_output_types(ClinicalTrial)

ib.collect(clinical_trials)

Unnamed: 0,id,title,date,journal
0,NCT01967433,Use of Diphenhydramine as an Adjunctive Sedati...,2020-01-01,Journal of emergency nursing
1,NCT04189588,Phase 2 Study IV QUZYTTIR™ (Cetirizine Hydroch...,2020-01-01,Journal of emergency nursing
2,NCT04237090,,2020-01-01,Journal of emergency nursing
3,NCT04237091,Feasibility of a Randomized Controlled Clinica...,2020-01-01,Journal of emergency nursing
4,NCT04153396,Preemptive Infiltration With Betamethasone and...,2020-01-01,Hôpitaux Universitaires de Genève
5,NCT03490942,Glucagon Infusion in T1D Patients With Recurre...,2020-05-25,
6,,Glucagon Infusion in T1D Patients With Recurre...,2020-05-25,Journal of emergency nursing
7,NCT04188184,Tranexamic Acid Versus Epinephrine During Expl...,2020-04-27,Journal of emergency nursing\xc3\x28


- column can be `null`
- scientific title can be empty
- date are inconsistent between `DD/MM/YYYY` and `DD Month YYYY`
- journal can be `null`

In [18]:
class Drug(typing.NamedTuple):
    id: str
    name: str


drugs = pipeline | "ReadDrugs" >> beam.io.ReadFromCsv(
    drugs_dataset_path,
    header=0,
    names=["id", "name"],
    dtype={"id": str, "name": str},
    skip_blank_lines=True,
).with_output_types(Drug)

ib.collect(drugs)

Unnamed: 0,id,name
0,A04AD,DIPHENHYDRAMINE
1,S03AA,TETRACYCLINE
2,V03AB,ETHANOL
3,A03BA,ATROPINE
4,A01AD,EPINEPHRINE
5,6302001,ISOPRENALINE
6,R01AD,BETAMETHASONE


In [19]:
class Pubmed(typing.NamedTuple):
    id: str
    title: str
    date: str
    journal: str


pubmed = pipeline | "ReadPubmed" >> beam.io.ReadFromCsv(
    pubmed_dataset_path,
    header=0,
    names=["id", "title", "date", "journal"],
    dtype={
        "id": str,
        "title": str,
        "date": str,
        "journal": str,
    },
    skip_blank_lines=True,
    parse_dates=["date"],
    infer_datetime_format=True,
    dayfirst=True,
    cache_dates=True,
).with_output_types(Pubmed)

pubmed_json = pipeline | "ReadPubmedJson" >> beam.io.ReadFromJson(
    pubmed_json_dataset_path,
    orient="records",
    lines=False,
    dtype={
        "id": str,
        "title": str,
        "date": "datetime64[ns]",
        "journal": str,
    },
).with_output_types(Pubmed)

pubmed = (pubmed, pubmed_json) | beam.Flatten().with_output_types(Pubmed)

ib.collect(pubmed)

Unnamed: 0,id,title,date,journal
0,1.0,A 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing
1,2.0,"An evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing
2,3.0,Diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics
3,4.0,Tetracycline Resistance Patterns of Lactobacil...,2020-01-01,Journal of food protection
4,5.0,Appositional Tetracycline bone formation rates...,2020-01-02,American journal of veterinary research
5,6.0,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
6,7.0,The High Cost of Epinephrine Autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...
7,8.0,Time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...
8,9.0,Gold nanoparticles synthesized from Euphorbia ...,2020-01-01,"Journal of photochemistry and photobiology. B,..."
9,10.0,Clinical implications of umbilical artery Dopp...,2020-01-01,The journal of maternal-fetal & neonatal medicine


- id type are inconsistent between `int` and `str` and can be `null`
- date format is inconsistent between `YYYY-MM-DD` and `DD/MM/YYYY`

## Data Transformation

In addressing the Python exercise, my strategy for identifying drug mentions
within the Pubmed and ClinicalTrial corpus involves a straightforward yet
effective approach. I opted to search for the exact occurrence of drug names in
the text by splitting and converting every single word in the publications
dataset to uppercase.

However, it's essential to acknowledge potential challenges associated with this
approach. The simplicity of exact word matching may encounter difficulties when
dealing with variations such as different accents, languages, or typos. In
scenarios where the exact word may not align perfectly with the drug name, the
approach might yield false negatives or miss certain mentions.

For a more sophisticated solution, alternatives like Jaccard distance or
advanced language-aware vector distances could be explored. These approaches
would introduce a level of flexibility by considering potential similarities to
the topic, even if the drug name is not explicitly mentioned. This approach
becomes particularly valuable in overcoming challenges posed by language
variations, accents, or minor typos, providing a more robust and comprehensive
solution for identifying drug mentions within the corpus. But this approach is
vastly more compute intensive as simple table joints are no longer feasible.

In [20]:
def by_title_words_key(element):
    words = element.title.translate(str.maketrans("", "", string.punctuation)).split()
    for word in words:
        yield (word.upper(), element)

In [21]:
clinical_trials_by_word_keys = clinical_trials | beam.FlatMap(
    by_title_words_key
).with_output_types(typing.Tuple[str, ClinicalTrial])

ib.collect(clinical_trials_by_word_keys)

Unnamed: 0,0,1
0,USE,"(NCT01967433, Use of Diphenhydramine as an Adj..."
1,OF,"(NCT01967433, Use of Diphenhydramine as an Adj..."
2,DIPHENHYDRAMINE,"(NCT01967433, Use of Diphenhydramine as an Adj..."
3,AS,"(NCT01967433, Use of Diphenhydramine as an Adj..."
4,AN,"(NCT01967433, Use of Diphenhydramine as an Adj..."
...,...,...
90,VERSUS,"(NCT04188184, Tranexamic Acid Versus Epinephri..."
91,EPINEPHRINE,"(NCT04188184, Tranexamic Acid Versus Epinephri..."
92,DURING,"(NCT04188184, Tranexamic Acid Versus Epinephri..."
93,EXPLORATORY,"(NCT04188184, Tranexamic Acid Versus Epinephri..."


In [22]:
pubmed_words = pubmed | beam.FlatMap(by_title_words_key).with_output_types(
    typing.Tuple[str, Pubmed]
)

ib.collect(pubmed_words)

Unnamed: 0,0,1
0,A,"(1, A 44-year-old man with erythema of the fac..."
1,44YEAROLD,"(1, A 44-year-old man with erythema of the fac..."
2,MAN,"(1, A 44-year-old man with erythema of the fac..."
3,WITH,"(1, A 44-year-old man with erythema of the fac..."
4,ERYTHEMA,"(1, A 44-year-old man with erythema of the fac..."
...,...,...
185,OF,"(, Comparison of pressure BETAMETHASONE releas..."
186,UPPER,"(, Comparison of pressure BETAMETHASONE releas..."
187,TRAPEZIUS,"(, Comparison of pressure BETAMETHASONE releas..."
188,ATROPINE,"(, Comparison of pressure BETAMETHASONE releas..."


## Drug Mentions Detection

As outlined in the preceding section, my chosen approach involves searching for
the exact occurrence of drug names. To implement this strategy, the
`CoGroupByKey` operation is employed to join drug names with the publications
dataset. This operation, akin to a `JOIN` operation but more granular, produces
a `PCollection` comprising tuples containing the drug and the publication.

Subsequently, the `FlatMap` operation is utilized to iterate over the
publications, systematically searching for the exact occurrence of the drug name
in the text. If a match is identified, a tuple is generated, encompassing the
drug name and the corresponding publication ID. This process facilitates the
precise detection of drug mentions within the corpus, establishing a robust link
between drug names and the associated publications.

In [23]:
class Mention(typing.NamedTuple):
    drug_id: str
    drug_name: str
    publication_type: str
    publication_id: str
    publication_title: str
    publication_date: str
    publication_journal: str

In [24]:
def by_name_key(element):
    return (element.name, element)


drugs_by_word_keys = drugs | beam.Map(by_name_key).with_output_types(
    typing.Tuple[str, Drug]
)

ib.collect(drugs_by_word_keys)

Unnamed: 0,0,1
0,DIPHENHYDRAMINE,"(A04AD, DIPHENHYDRAMINE)"
1,TETRACYCLINE,"(S03AA, TETRACYCLINE)"
2,ETHANOL,"(V03AB, ETHANOL)"
3,ATROPINE,"(A03BA, ATROPINE)"
4,EPINEPHRINE,"(A01AD, EPINEPHRINE)"
5,ISOPRENALINE,"(6302001, ISOPRENALINE)"
6,BETAMETHASONE,"(R01AD, BETAMETHASONE)"


In [25]:
def unnest_clinical_trials_mentions(element):
    for clinical_trial in element[1]["clinical_trials"]:
        for drug in element[1]["drugs"]:
            yield Mention(
                drug_id=drug.id,
                drug_name=drug.name,
                publication_type="CLINICAL_TRIAL",
                publication_id=clinical_trial.id,
                publication_title=clinical_trial.title,
                publication_date=clinical_trial.date,
                publication_journal=clinical_trial.journal,
            )


clinical_trials_mentions = (
    {
        "clinical_trials": clinical_trials_by_word_keys,
        "drugs": drugs_by_word_keys,
    }
    | beam.CoGroupByKey()
    | beam.FlatMap(unnest_clinical_trials_mentions).with_output_types(Mention)
)

ib.collect(clinical_trials_mentions)

Unnamed: 0,drug_id,drug_name,publication_type,publication_id,publication_title,publication_date,publication_journal
0,A04AD,DIPHENHYDRAMINE,CLINICAL_TRIAL,NCT01967433,Use of Diphenhydramine as an Adjunctive Sedati...,2020-01-01,Journal of emergency nursing
1,A04AD,DIPHENHYDRAMINE,CLINICAL_TRIAL,NCT04189588,Phase 2 Study IV QUZYTTIR™ (Cetirizine Hydroch...,2020-01-01,Journal of emergency nursing
2,A04AD,DIPHENHYDRAMINE,CLINICAL_TRIAL,NCT04237091,Feasibility of a Randomized Controlled Clinica...,2020-01-01,Journal of emergency nursing
3,A01AD,EPINEPHRINE,CLINICAL_TRIAL,NCT04188184,Tranexamic Acid Versus Epinephrine During Expl...,2020-04-27,Journal of emergency nursing\xc3\x28
4,R01AD,BETAMETHASONE,CLINICAL_TRIAL,NCT04153396,Preemptive Infiltration With Betamethasone and...,2020-01-01,Hôpitaux Universitaires de Genève


In [26]:
def unnest_pubmed_mentions(element):
    for pubmed in element[1]["pubmed"]:
        for drug in element[1]["drugs"]:
            yield Mention(
                drug_id=drug.id,
                drug_name=drug.name,
                publication_type="PUBMED",
                publication_id=pubmed.id,
                publication_title=pubmed.title,
                publication_date=pubmed.date,
                publication_journal=pubmed.journal,
            )


drugs_pubmed_mentions = (
    {
        "pubmed": pubmed_words,
        "drugs": drugs_by_word_keys,
    }
    | beam.CoGroupByKey()
    | beam.FlatMap(unnest_pubmed_mentions).with_output_types(Mention)
)

ib.collect(drugs_pubmed_mentions)

Unnamed: 0,drug_id,drug_name,publication_type,publication_id,publication_title,publication_date,publication_journal
0,A04AD,DIPHENHYDRAMINE,PUBMED,1.0,A 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing
1,A04AD,DIPHENHYDRAMINE,PUBMED,2.0,"An evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing
2,A04AD,DIPHENHYDRAMINE,PUBMED,3.0,Diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics
3,S03AA,TETRACYCLINE,PUBMED,4.0,Tetracycline Resistance Patterns of Lactobacil...,2020-01-01,Journal of food protection
4,S03AA,TETRACYCLINE,PUBMED,5.0,Appositional Tetracycline bone formation rates...,2020-01-02,American journal of veterinary research
5,S03AA,TETRACYCLINE,PUBMED,6.0,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
6,V03AB,ETHANOL,PUBMED,6.0,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
7,V03AB,ETHANOL,PUBMED,6.0,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
8,A01AD,EPINEPHRINE,PUBMED,7.0,The High Cost of Epinephrine Autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...
9,A01AD,EPINEPHRINE,PUBMED,8.0,Time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...


In [27]:
drug_mentions = (
    clinical_trials_mentions,
    drugs_pubmed_mentions,
) | beam.Flatten().with_output_types(Mention)

ib.collect(drug_mentions)

Unnamed: 0,drug_id,drug_name,publication_type,publication_id,publication_title,publication_date,publication_journal
0,A04AD,DIPHENHYDRAMINE,PUBMED,1,A 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing
1,A04AD,DIPHENHYDRAMINE,PUBMED,2,"An evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing
2,A04AD,DIPHENHYDRAMINE,PUBMED,3,Diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics
3,S03AA,TETRACYCLINE,PUBMED,4,Tetracycline Resistance Patterns of Lactobacil...,2020-01-01,Journal of food protection
4,S03AA,TETRACYCLINE,PUBMED,5,Appositional Tetracycline bone formation rates...,2020-01-02,American journal of veterinary research
5,S03AA,TETRACYCLINE,PUBMED,6,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
6,V03AB,ETHANOL,PUBMED,6,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
7,V03AB,ETHANOL,PUBMED,6,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
8,A01AD,EPINEPHRINE,PUBMED,7,The High Cost of Epinephrine Autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...
9,A01AD,EPINEPHRINE,PUBMED,8,Time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...


## Write to JSON

Finally, the results are written to a JSON file. An important consideration
here is the output data format include a snapshot of the publication as the
content may be updated in the future. This approach ensures the results are
consistent with the original data and can be easily validated.

In [28]:
drug_mentions_write = drug_mentions | beam.io.WriteToJson(
    drug_mention_output_path,
    orient="records",
    date_format="iso",
    lines=True,
)

ib.show(drug_mentions_write)