In [21]:
import pyspark
import dxpy
import dxdata
import re
import pandas as pd
from pyspark.sql.functions import col, to_date

In [2]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

dispensed_database_name = dxpy.find_one_data_object(
    classname="database", name="app*", folder="/", name_mode="glob", describe=True
)["describe"]["name"]
dispensed_dataset_id = dxpy.find_one_data_object(
    typename="Dataset", name="app*.dataset", folder="/", name_mode="glob"
)["id"]

spark.sql("USE " + dispensed_database_name)

dataset = dxdata.load_dataset(id=dispensed_dataset_id)

In [None]:
# Init pyspark and load dataframe
OMOP_TABLE = "omop_drug_era"  # 'omop_drug_era', 'omop_drug_exposure'

# Load dose era table
ode = dataset[OMOP_TABLE]

field_names = [f.name for f in ode.fields]

df = ode.retrieve_fields(names=field_names, engine=dxdata.connect())
# df = df.drop("eid")

print(f"Number of entries {df.count()}")
df.drop("eid").show(3)

In [4]:
participant = dataset["participant"]

In [5]:
field_names = ["eid"] + [
    f.name
    for f in participant.find_fields(lambda f: bool(re.match(f"^p({53})(_|$)", f.name)))
]
vf = participant.retrieve_fields(names=field_names, engine=dxdata.connect())

In [10]:
vf.drop("eid").show(3)

+----------+------+----------+------+
|    p53_i0|p53_i1|    p53_i2|p53_i3|
+----------+------+----------+------+
|2008-12-19|  null|      null|  null|
|2006-03-21|  null|      null|  null|
|2008-04-07|  null|      null|  null|
|2010-06-22|  null|      null|  null|
|2009-04-08|  null|      null|  null|
|2007-07-20|  null|      null|  null|
|2008-02-13|  null|      null|  null|
|2008-04-10|  null|      null|  null|
|2007-12-06|  null|      null|  null|
|2009-07-27|  null|      null|  null|
|2008-07-04|  null|      null|  null|
|2009-12-11|  null|      null|  null|
|2010-07-01|  null|      null|  null|
|2007-08-13|  null|2019-10-02|  null|
|2008-04-03|  null|      null|  null|
|2008-12-17|  null|      null|  null|
|2008-02-23|  null|2022-01-17|  null|
|2008-05-19|  null|      null|  null|
|2008-06-26|  null|      null|  null|
|2008-01-22|  null|      null|  null|
+----------+------+----------+------+
only showing top 20 rows



In [8]:
df2 = df.join(vf, on="eid", how="inner")
df2.count()

19959413

In [None]:
df3 = df2.filter(col("p53_i2").isNotNull())
# df3 = (df3.withColumn("drug_era_start_date", col("drug_era_start_date").cast("date"))
#        .withColumn("drug_era_end_date", col("drug_era_end_date").cast("date"))
#        .withColumn("p53_i2", col("p53_i2").cast("date")))
df3 = (
    df3.withColumn(
        "drug_era_start_date", to_date(col("drug_era_start_date"), "MM/dd/yyyy")
    )
    .withColumn("drug_era_end_date", to_date(col("drug_era_end_date"), "MM/dd/yyyy"))
    .withColumn("p53_i2", to_date(col("p53_i2"), "yyyy-MM-dd"))
)
df3.printSchema()
# filtered_df = df3.filter(col("p53_i2") > col("drug_era_start_date"))

# Filter rows where p53_i2 is between drug_era_start_date and drug_era_end_date
filtered_df = df3.filter(
    col("p53_i2").between(col("drug_era_start_date"), col("drug_era_end_date"))
)

# Show the result of the filter
filtered_df.show()
filtered_df.count()

In [32]:
# Convert the filtered DataFrame to a Pandas DataFrame
pandas_df = filtered_df.toPandas()
pandas_df.loc[:, "drug_concept_id"] = pandas_df.loc[:, "drug_concept_id"].astype(int)
# Save the Pandas DataFrame to a CSV file
pandas_df.to_csv("filtered_data.csv", index=False)

  pandas_df.loc[:, "drug_concept_id"] = pandas_df.loc[:,"drug_concept_id"].astype(int)


In [36]:
de = pd.read_csv("dose_era_ingredients.tsv", sep="\t")
de.loc[:, "drug_concept_id"] = de.loc[:, "drug_concept_id"].astype(int)
de.set_index("drug_concept_id", inplace=True)

In [37]:
joined_df = pandas_df.join(de, on="drug_concept_id", how="inner")

In [39]:
joined_df.to_csv("joined_df.csv")

In [47]:
pd.Series(
    joined_df.loc[:, "concept_name"].value_counts()
).sum()  # .to_csv("drug_counts.csv")

1638

In [4]:
# Get RxNorm Ingredients (IN) and associated concept_codes
conc = spark.read.csv(
    "/resources/omop_concept.tsv", sep="\t", header=True, inferSchema=True
)
rx_ing = conc.filter(
    (conc["vocabulary_id"] == "RxNorm")
    & (conc["concept_class_id"] == "Ingredient")
    & (conc["invalid_reason"].isNull())
)
print(f"RxNorm Ingredients: {rx_ing.count()}")
print(rx_ing.columns)
rx_ing.show(3, truncate=False, vertical=False)

RxNorm Ingredients: 13137
['concept_id', 'concept_name', 'domain_id', 'vocabulary_id', 'concept_class_id', 'standard_concept', 'concept_code', 'valid_start_date', 'valid_end_date', 'invalid_reason']
+----------+------------------------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|concept_id|concept_name                        |domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|
+----------+------------------------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|501343    |hepatitis B immune globulin         |Drug     |RxNorm       |Ingredient      |S               |26744       |1970-01-01      |2099-12-31    |null          |
|507832    |herpesvirus 3, human                |Drug     |RxNorm       |Ingredient      |S               |11131       |1970-01-0

In [45]:
pd.Series(joined_df.loc[:, "eid"].value_counts()).to_csv("person_counts.csv")

In [None]:
joined_df

In [5]:
# Get distinct drug_concept_ids

dci = df.select(["drug_concept_id"]).distinct()
print(f"Number of records before join: {dci.count()}")

dci = dci.join(rx_ing, dci["drug_concept_id"] == rx_ing["concept_id"], how="inner")
print(f"Number of records after join: {dci.count()}")

dci.show(5, truncate=False, vertical=False)

Number of records before join: 1620
Number of records after join: 1572
+---------------+----------+------------------------------------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|drug_concept_id|concept_id|concept_name                                    |domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|
+---------------+----------+------------------------------------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|501343         |501343    |hepatitis B immune globulin                     |Drug     |RxNorm       |Ingredient      |S               |26744       |1970-01-01      |2099-12-31    |null          |
|509081         |509081    |meningococcal group C polysaccharide            |Drug     |RxNorm       |Ingredient      |S               |29503     

In [8]:
###
### RxNorm Ingredient
###

# Documentation here https://www.nlm.nih.gov/research/umls/rxnorm/docs/techdoc.html#conso
# See https://www.nlm.nih.gov/research/umls/rxnorm/docs/appendix5.html for TTY
# Essentially _c11 = SAB, Source abbreviation. _c12 = TTY, Term type in source, _c16 = SUPPRESS, where N is No

conso = spark.read.csv(
    "/resources/RXNCONSO.RRF", sep="|", header=False, inferSchema=True
)

concept_code_to_drug = conso.filter(
    (conso["_c11"] == "RXNORM") & (conso["_c12"] == "IN") & (conso["_c16"] == "N")
)


# Identify columns where all values are null
non_null_columns = [
    col_name
    for col_name in concept_code_to_drug.columns
    if concept_code_to_drug.filter(col(col_name).isNotNull()).count() > 0
]

# Select only the columns that are not entirely null
concept_code_to_drug = concept_code_to_drug.select(*non_null_columns)

print(f"Number of RxNorm IN concepts: {concept_code_to_drug.count()}")
concept_code_to_drug.show(3, truncate=False, vertical=False)

Number of RxNorm IN concepts: 14395
+---+---+--------+--------+---+------+----+----+----------------+----+----+
|_c0|_c1|_c7     |_c8     |_c9|_c11  |_c12|_c13|_c14            |_c16|_c17|
+---+---+--------+--------+---+------+----+----+----------------+----+----+
|44 |ENG|12251526|12251526|44 |RXNORM|IN  |44  |mesna           |N   |4096|
|61 |ENG|12254378|12254378|61 |RXNORM|IN  |61  |beta-alanine    |N   |4096|
|73 |ENG|12252051|12252051|73 |RXNORM|IN  |73  |docosahexaenoate|N   |4096|
+---+---+--------+--------+---+------+----+----+----------------+----+----+
only showing top 3 rows



In [9]:
dci2 = dci.join(
    concept_code_to_drug,
    dci["concept_code"] == concept_code_to_drug["_c0"],
    how="inner",
)
print(dci2.count())  # We lose Senna Leaves which is obsolete
dci2.show(3, truncate=False, vertical=False)

1568
+---------------+----------+-------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+---+---+--------+--------+---+------+----+----+-------------+----+----+
|drug_concept_id|concept_id|concept_name |domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|_c0|_c1|_c7     |_c8     |_c9|_c11  |_c12|_c13|_c14         |_c16|_c17|
+---------------+----------+-------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+---+---+--------+--------+---+------+----+----+-------------+----+----+
|1319998        |1319998   |acebutolol   |Drug     |RxNorm       |Ingredient      |S               |149         |1970-01-01      |2099-12-31    |null          |149|ENG|12253229|12253229|149|RXNORM|IN  |149 |acebutolol   |N   |4096|
|19024063       |19024063  |acenocoumarol|Drug     |RxNorm       |I

In [10]:
print(f"Number of entries original: {df.count()}")

print(
    f"Number of entries finished: {df.join(dci2, on = 'drug_concept_id', how = 'inner').count()}"
)

Number of entries original: 19959413
Number of entries finished: 19833183


In [11]:
dci.join(dci2, on="drug_concept_id", how="left_anti").show()

+---------------+----------+--------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|drug_concept_id|concept_id|        concept_name|domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|
+---------------+----------+--------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|       42899447|  42899447|       insulin, pork|     Drug|       RxNorm|      Ingredient|               S|     1309342|      2012-10-01|    2099-12-31|          null|
|         992409|    992409|        Senna leaves|     Drug|       RxNorm|      Ingredient|               S|      237929|      1970-01-01|    2099-12-31|          null|
|       19058973|  19058973|calcium phosphate...|     Drug|       RxNorm|      Ingredient|               S|       47627|      1970-01-01|    2099-12-31|        

In [12]:
dci2.toPandas().to_csv("drug_era_ingredients.tsv", sep="\t")

In [13]:
###
### ATC codes
###
conso = spark.read.csv(
    "/resources/RXNCONSO.RRF", sep="|", header=False, inferSchema=True
)

concept_code_to_atc = conso.filter(
    (conso["_c11"] == "ATC") & (conso["_c12"] == "IN") & (conso["_c16"] == "N")
)


# Identify columns where all values are null
non_null_columns = [
    col_name
    for col_name in concept_code_to_atc.columns
    if concept_code_to_atc.filter(col(col_name).isNotNull()).count() > 0
]

# Select only the columns that are not entirely null
concept_code_to_atc = concept_code_to_atc.select(*non_null_columns)

print(f"Number of ATC codes: {concept_code_to_atc.count()}")
concept_code_to_atc.show(3, truncate=False, vertical=False)

Number of ATC codes: 5495
+---+---+-------+----+----+-------+--------+----+
|_c0|_c1|_c7    |_c11|_c12|_c13   |_c14    |_c16|
+---+---+-------+----+----+-------+--------+----+
|44 |ENG|5481074|ATC |IN  |R05CB05|mesna   |N   |
|44 |ENG|5481075|ATC |IN  |V03AF01|mesna   |N   |
|63 |ENG|5481013|ATC |IN  |C10AX05|meglutol|N   |
+---+---+-------+----+----+-------+--------+----+
only showing top 3 rows



In [14]:
dci3 = dci.join(
    concept_code_to_atc, dci["concept_code"] == concept_code_to_atc["_c0"], how="inner"
)

print(dci3.count())
dci3.show(3, truncate=False, vertical=False)

1617
+---------------+----------+-------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+---+---+-------+----+----+-------+-------------+----+
|drug_concept_id|concept_id|concept_name |domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|_c0|_c1|_c7    |_c11|_c12|_c13   |_c14         |_c16|
+---------------+----------+-------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+---+---+-------+----+----+-------+-------------+----+
|1319998        |1319998   |acebutolol   |Drug     |RxNorm       |Ingredient      |S               |149         |1970-01-01      |2099-12-31    |null          |149|ENG|5478462|ATC |IN  |C07AB04|acebutolol   |N   |
|19024063       |19024063  |acenocoumarol|Drug     |RxNorm       |Ingredient      |S               |154         |1970-01-01      |2099-12-3

In [15]:
print(f"Number of entries original: {df.count()}")

print(
    f"Number of entries finished: {df.join(dci3, on = 'drug_concept_id', how = 'inner').count()}"
)
print(
    f"Number of distinct finished: {df.join(dci3, on = 'drug_concept_id', how = 'inner').select('drug_concept_id').distinct().count()}"
)

Number of entries original: 19959413
Number of entries finished: 38757289
Number of distinct finished: 1141


In [16]:
atc_missing = dci.join(dci3, on="drug_concept_id", how="left_anti")

print(f"Missing ATC: {atc_missing.count()}")
print(
    f"Records with ATC codes: {df.join(atc_missing, on = 'drug_concept_id', how = 'left_anti').count()}"
)


SHOW_MISSING = False
N_SHOW = 10

if SHOW_MISSING:
    dci_anti = atc_missing.show(N_SHOW)

Missing ATC: 431
Records with ATC codes: 17348853


In [18]:
dci3.toPandas().to_csv("drug_era_atc.tsv", sep="\t")

In [17]:
###
### DrugBank
###
conso = spark.read.csv(
    "/resources/RXNCONSO.RRF", sep="|", header=False, inferSchema=True
)

concept_code_to_db = conso.filter(
    (conso["_c11"] == "DRUGBANK") & (conso["_c12"] == "IN") & (conso["_c16"] == "N")
)


# Identify columns where all values are null
non_null_columns = [
    col_name
    for col_name in concept_code_to_db.columns
    if concept_code_to_db.filter(col(col_name).isNotNull()).count() > 0
]

# Select only the columns that are not entirely null
concept_code_to_db = concept_code_to_db.select(*non_null_columns)

print(f"Number of DrugBank identifier: {concept_code_to_db.count()}")
concept_code_to_db.show(3, truncate=False, vertical=False)

Number of DrugBank identifier: 10715
+---+---+--------+-------+--------+----+-------+------------------------------+----+
|_c0|_c1|_c7     |_c9    |_c11    |_c12|_c13   |_c14                          |_c16|
+---+---+--------+-------+--------+----+-------+------------------------------+----+
|48 |ENG|12855631|DB18029|DRUGBANK|IN  |DB18029|24,25-Dihydroxycholecalciferol|N   |
|60 |ENG|10894803|DB01509|DRUGBANK|IN  |DB01509|Tenamfetamine                 |N   |
|74 |ENG|9733930 |DB02362|DRUGBANK|IN  |DB02362|Aminobenzoic acid             |N   |
+---+---+--------+-------+--------+----+-------+------------------------------+----+
only showing top 3 rows



In [18]:
dci4 = dci.join(
    concept_code_to_db, dci["concept_code"] == concept_code_to_db["_c0"], how="inner"
)

print(dci4.count())
dci4.show(3, truncate=False, vertical=False)

1280
+---------------+----------+-------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+---+---+-------+-------+--------+----+-------+-------------+----+
|drug_concept_id|concept_id|concept_name |domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|_c0|_c1|_c7    |_c9    |_c11    |_c12|_c13   |_c14         |_c16|
+---------------+----------+-------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+---+---+-------+-------+--------+----+-------+-------------+----+
|1319998        |1319998   |acebutolol   |Drug     |RxNorm       |Ingredient      |S               |149         |1970-01-01      |2099-12-31    |null          |149|ENG|8575333|DB01193|DRUGBANK|IN  |DB01193|Acebutolol   |N   |
|19024063       |19024063  |acenocoumarol|Drug     |RxNorm       |Ingredient      |S       

In [19]:
print(f"Number of entries original: {df.count()}")
print(
    f"Number of entries finished: {df.join(dci4, on = 'drug_concept_id', how = 'inner').count()}"
)
print(
    f"Number of distinct finished: {df.join(dci4, on = 'drug_concept_id', how = 'inner').select('drug_concept_id').distinct().count()}"
)

drugbank_missing = dci.join(dci4, on="drug_concept_id", how="left_anti")

print(f"Missing DrugBank ID: {drugbank_missing.count()}")
print(
    f"Records with DrugBank codes: {df.join(drugbank_missing, on = 'drug_concept_id', how = 'left_anti').count()}"
)


SHOW_MISSING = True
N_SHOW = 10

if SHOW_MISSING:
    dci_anti = drugbank_missing.show(N_SHOW)

Number of entries original: 19959413
Number of entries finished: 18211458
Number of distinct finished: 1271
Missing DrugBank ID: 301
Records with DrugBank codes: 18192505
+---------------+----------+------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|drug_concept_id|concept_id|      concept_name|domain_id|vocabulary_id|concept_class_id|standard_concept|concept_code|valid_start_date|valid_end_date|invalid_reason|
+---------------+----------+------------------+---------+-------------+----------------+----------------+------------+----------------+--------------+--------------+
|         790253|    790253|       clorazepate|     Drug|       RxNorm|      Ingredient|               S|        2353|      1970-01-01|    2099-12-31|          null|
|         900017|    900017|   cascara sagrada|     Drug|       RxNorm|      Ingredient|               S|       66869|      1970-01-01|    2099-12-31|          null|

In [20]:
dci4.toPandas().to_csv("drug_era_drugbank.tsv", sep="\t")

In [21]:
###
### Chebi / OMOP2OBO
###

chebi = pd.read_excel(
    "OMOP2OBO_V1_Drug_Exposure_Mapping_Oct2020.xlsx",
    sheet_name="OMOP2OBO_ChEBI_Mapping_Results",
)
chebi["ONTOLOGY_LOGIC"] = chebi["ONTOLOGY_LOGIC"].astype(str)
chebi = spark.createDataFrame(chebi)

  for column, series in pdf.iteritems():


In [22]:
dci = df.select(["drug_concept_id"]).distinct()
dci5 = dci.join(chebi, dci["drug_concept_id"] == chebi["CONCEPT_ID"], how="inner")
print(dci5.count())
dci5.show()

1562
+---------------+----------+------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|drug_concept_id|CONCEPT_ID|CONCEPT_CODE|        CONCEPT_NAME|ONTOLOGY_LOGIC|        ONTOLOGY_URI|      ONTOLOGY_LABEL|    MAPPING_CATEGORY|    MAPPING_EVIDENCE|
+---------------+----------+------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|         523283|    523283|    763096.0|poliovirus vaccin...|           nan|         CHEBI_60816|           immunogen|Manual One-to-One...|Source:DrugBank_D...|
|         529303|    529303|    798304.0|diphtheria toxoid...|           nan|         CHEBI_60816|           immunogen|Manual One-to-One...|        Hand Mapping|
|         529411|    529411|    798306.0|tetanus toxoid va...|           nan|         CHEBI_60816|           immunogen|Manual One-to-One...|Source:DrugBank_D...|
|         532881|    53

In [23]:
dci5.toPandas().to_csv("drug_era_chebi.tsv", sep="\t")